From c4cfca50a43e390de638fbdaf70f8a46bc70c019 Mon Sep 17 00:00:00 2001 From: bunny <1319900154@qq.com> Date: Mon, 19 May 2025 14:04:17 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E6=B6=88=E8=B4=B9=E7=AB=AF?= =?UTF-8?q?=E9=99=90=E6=B5=81=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mq-demo/ReadMe.md | 95 +++++++++++++++++- mq-demo/images/image-20250519135056806.png | Bin 0 -> 7902 bytes mq-demo/images/image-20250519135131274.png | Bin 0 -> 7902 bytes .../RabbitMQMessageListenerConstants.java | 1 + .../mq/listener/MessageListenerOrder.java | 90 +++++++++-------- mq-demo/src/main/resources/application.yaml | 1 + .../mq/mqdemo/MqDemoApplicationTests.java | 16 ++- 7 files changed, 156 insertions(+), 47 deletions(-) create mode 100644 mq-demo/images/image-20250519135056806.png create mode 100644 mq-demo/images/image-20250519135131274.png diff --git a/mq-demo/ReadMe.md b/mq-demo/ReadMe.md index 14aa493..51f792c 100644 --- a/mq-demo/ReadMe.md +++ b/mq-demo/ReadMe.md @@ -216,6 +216,7 @@ rabbitmq: username: ${bunny.rabbitmq.username} password: ${bunny.rabbitmq.password} virtual-host: ${bunny.rabbitmq.virtual-host} + # 需要注释下面这两个,不需要这两个,因为要手动确认 # publisher-confirm-type: correlated # 交换机确认 # publisher-returns: true # 队列确认 listener: @@ -266,4 +267,96 @@ public void processQueue(String dataString, Message message, Channel channel) th throw new RuntimeException(e); } } -``` \ No newline at end of file +``` + +## 消费端限流 + +### 设置方式 + +在配置文件中设置`prefetch`值。如果不设置,当生产者将消息放置到RabbitMQ中时,是一次性取回的,无论有多少。 + +设置了`prefetch`之后,每次取回数量就是`prefetch`的数量。 + +> [!NOTE] +> +> 并且在UI界面中`Unacked`值和我们设置的值一致。 +> +> ![image-20250519135056806](./images/image-20250519135056806.png) +> +> *图中表示表示当前有5条消息已被消费者获取但未确认(正在处理中)* +> +> 当`prefetch=5`且消费速度为1条/秒时: +> +> - 初始会立即获取5条消息(Unacked=5) +> - 每ACK 1条后,Broker会立即推送1条新消息(动态保持Unacked≈5) + +```yaml + rabbitmq: + host: ${bunny.rabbitmq.host} + port: ${bunny.rabbitmq.port} + username: ${bunny.rabbitmq.username} + password: ${bunny.rabbitmq.password} + virtual-host: ${bunny.rabbitmq.virtual-host} + # publisher-confirm-type: correlated # 交换机确认 + # publisher-returns: true # 队列确认 + listener: + simple: + acknowledge-mode: manual # 手动处理消息 + prefetch: 5 # 设置每次取回数量,消息条数(非字节或KB) +``` + +> [!IMPORTANT] +> **RabbitMQ Prefetch 机制(prefetch=5)** +> +> 在 RabbitMQ 的 **prefetch(QoS,服务质量设置)** 机制下,当 `prefetch=5` 时,**消费端的行为** 取决于 **消息确认模式(Ack/Nack)** 和 **消费速度** +> +> **核心规则**: +> - 保持 `unacked` 消息数 **≤ prefetch (5)** +> - **不会** 等5条全部ACK完才发下一批,而是 **动态补充**(每ACK 1条,补发1条) +> +> **不同模式对比**: +> | 模式 | 行为 | +> | -------------------------------------- | --------------------------------------------- | +> | **手动ACK** (`AcknowledgeMode.MANUAL`) | ✔️ 推荐!保持 `unacked ≤ 5`,ACK后立即补新消息 | +> | **自动ACK** (`AcknowledgeMode.AUTO`) | ⚠️ 无效!消息投递后立即ACK,prefetch无法限流 | +> +> >自动ACK模式下**prefetch仍然有效**(限制未处理的消息数),但消息会在投递后立即被ACK,实际可能失去限流意义。 +> +> **消费慢时的表现**: +> +> - 若消费速度=1条/秒,RabbitMQ会 **持续补消息**,始终维持 `unacked ≈ 5` +> + +### 测试方式 + +生产者生产一定数量的消息。 + +```java +/* 发送消息,发送多条消息,测试使用 */ +@Test +void buildMessageTest() { + String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; + String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; + + for (int i = 0; i < 100; i++) { + rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "测试消息发送【" + i + "】"); + } +} +``` + +消费者进行消费消息,在消费的时候为了方便观察,每秒去读一个。 + +```java +@RabbitListener(queues = {QUEUE_NAME}) +public void processMessagePrefetch(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("消费者 消息内容:{}", dataString); + + TimeUnit.SECONDS.sleep(1); + + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); +} +``` + +## 消息超时 + +![image-20250519135919549](./images/image-20250519135919549.png) \ No newline at end of file diff --git a/mq-demo/images/image-20250519135056806.png b/mq-demo/images/image-20250519135056806.png new file mode 100644 index 0000000000000000000000000000000000000000..7c34c3fcb3264ef51ce0f148b411a39593d4293c GIT binary patch literal 7902 zcma)hXIN8B*KTZ}bmRe4sv&@ifCADxL7H@>6N>Z_I-wV7K?I~oFVb76p$MT!QIX!O zA<}!V0SRaOyw~^Ry}s*Q=jxpSBG)s%MG zmR`O+Z9!eXX@=^4ZU01F;bfzI_VcQF!kMJ^?K{w}P)@12N~x`-pNESyX?p{!7k-D3 zw{Un2V4E~~svSLW|JX{^3EWC$$Zmi@#YE4^K%mbLx}JeRL3aPUAnj=#sUj~Wv$zH( z^b6OcM;dUc$wP816^K>!GXoI_q(N?e4`})Y(hO*F9O6nA6?5vl&>GY zIyvJYcJNk{2_$_tin((D);f!xGx#~mgPbNzrOjk%A|AHGS~c&VuyCV}zWT&WtpvCyZ8 zTGkpCZ1R(bfM+JyUyFFpTH*!8NW`cN6~~QT9UuBqlyATHdo-~*IH~@c=^%^Eq95Nc zcUppK9%IhMxVYk6=rO4*eWkay^Y)$8wu=&e7(?CtvZIPW51gRW{!~eP9A0y6T*_qp z+=ix*x;;IA_S3)3)Ob5O;UY%c5j)nsClIz}=QC!RV(n;+*kN#R za%^RH%7msFzk2JapHJupNYPkxfIMoT46JKO!W6nvTJJVjQu1gvqfSqa-s8?Y*4xJC z*9>d)0w^rHmZS1XwS?_s!n3U|0 z@K7dEGjao;&Mld7MNt^;ak5UOcVqh8W`!z7gSCs)GDST$f?Rygc3&1n+V~uuDdsef z=_r_}YvTzwUV<0@tzwYU&PVU4>1i&ZcJ;#Rq62IlYJAm2VOy3rzu1-utEt@!QHu>Q zWrFo?h-wJC`nj*GCJXq#-3oRgH82Ay`VOY#zR8ajr`@7XenmIz{buQ8$BMk9q|~5! z<3r63JMZKeYo+}R2dwD{kI}tb4K454zzbi%xf@Tk_v(e&?^26Y!U`Q#QO3)_RO)7^ z-IeE!%>6^xF*Xbb-)&*mntS8m=!8>-W;Du4UiQCnTwUVC{Pyn0tTewGS4BmxzM4Hw z&YVPcJ=VliyR4jqTV|;Q`UmdyZEgmd!=^_ctndUc>rD`OU++#}lq)luKH z+HehYcZ9#t#=9BpNKP6L=CP|m7IFVs|IOsDJ%)|CK^MU46)}rJUGh$Tg2V31721IZ z{BrHzC)3wFcdlrkV?V(D0ldbVj)|^UdJo&__c*GB?O|^3!Ag7Q(1hm&EJ{8A*5h@? z_aXb&DXH#WOdPV2wzXF6@6r;JT<$izxPmNk=?9=D`$=d1cFrCgp3%;Y1auY{n@F5_1fBrTC5<2{p0vD zHpRtoVriu-gGF~EW0)fpUw6AKS$5XuF4i8bbB=mmT6#_-NsRdFKn7}!d0|aG6CSXG ziaq_*(o%W61wVBhx@hwLQ;tNhdLch*YHHnX*Eh`bN!hy2LB{Sda-KOmk_-G1jFf=6 z8-{l?N~yi?~0qCHJd`5&mrdXzk~t z)lc;M=K)Y*khI2;jIB0iXuOvzq;K5#o$=0AeLcc4A;PWfRM4e8CfUAf_NV#0H+~0( zNku+Dsp?KY1bl9t$pSS(L5`f|$y+=J7!W)y*Ag&0M&Md@I; zE1~cHZBwgY@LOYJXFj(n8T$aMJj%qLgH4AQ!-%G@N5$jks!Z|sKtV_(-^$8LQr*Hk z$_E86tSP0w%Ex%|lD3-JrZQXyeFPpLc=i%P}qPhS7 z0*lE?6J1PbZOH#_#G=F)pxhpaQd(-N(l)QjV0U--zyPp&?C7u0C((AsL;U~j|2RHA zmY0{`+L`Z+z`}*7>+0(6N52HSHX5<`A5l}^CkvO0pqLL71VL^o#a|DWR#Q{6vU(Cr zL3BNMdRkv8e(#6Ku((*bek(9*(hO**fqCQbdIGCjt|1P=3-c zg!4PaBBo!hkBp4GdCUx2=srB9!0DP04l9{@?qXvF*99FCot&nM|B!N3=UBTo%MbcY zH;o)%Xt+1Wcx>+%W{BmA8`zvXF3DO`;{3y~sGYkrs`I3g$S*CPzBlMM%@aD&Z>MYo z!ZX{3?dk;6w^PfmNgo^l=3Z>5fJZKE@^-lz8BY9<*$ow;wR_7kp|Hj@td;8rOymT$ zeWHpe_Mfy-h=85lGD7ZcPMj;%Yhq;Ev4+$tMp4|p zx_PxK`&q9yrd(z1S&5$EyRXY)AXDnBHU!nMiQiCWgG)lq5IvuCKm1V%BYRHN27;?0 zZYb)W#478dBKfU^I)^T)cA7i=X_*R}90tb%J~82!ga;zAA*2i{q9v;pZI(=%MUlv( za*36JnE4BC?oH&*sfVB4T4R|N5zTPoqxh`1jlJtpYe!@MXr|S-s7RCDkQOeW9WZd) ztUhDcB%)q!Zhns7UKdPB{$&vvdP0_&Zo&)6@sOHvJLe|o{QG}@ZYFQ@r4)hV*hfg)JcqL4kS-2{PBtcXtgMRE?xkv* zuX^mp4QydJ$G}LxcJh#!E3Bn&yKeBbO}7cpmAJw-0?1droAzOc1irn95)({54dRaLuQbt{IUz_S zG~%22`Z(#f;W+o}zd~A1+BPh(>XR#sFsu^TlQ12Zv54=}#^TpYJ9v^NLzO0MPyE1 zg-b)=M^5m;$qUNK;|40~Pxq&a>n=Dn?^i#xFBRiFIuwuFvnNi^{kG?nZZG9=FJ??0 zdbB|!&rAE}C%y+zPHHB1Z0!{li8t1+>30BX$IucXBV6tCAH8zGs=50uj;ARF+OCQ_Aj#U#H{G!?ayPH|H z{JRWvMS}DP1KBcBZq~$2MY9EA3GxijtvS0DWLH@-^vQxS*!AQ@LoA(xTa>86d3Jj} zy$_g|l!hwWxt0&y9o(OWa`sUW0e_m1w1l`;j}F{}rXZH13u5{}q>XWJXdAmLr(YWm z$BSd$$~M`#RF%55>RmE0X6EEGXg*Jjl$8=P(p#(BYrk7o{!YYS`yMdPhf}B(me*A} zUZZ^VdKE)nS?V@IJc3ki9T%BnenQj+X5swVRA4LOr%lgP0q&CiUijRebcGOVXvRf$ z2k87qH4W@YRDTb=H%;@@E$n50v7wB>hE& zgo-@5T@vHs;xaSK$L+>>Zgv3Hm=ZQVtxu5fZ_OHvS6m!nJ?>#~5J!1(gKUYPlYN|MJ^M^r{w(M{5QEyPN^%(m>)exO zLYf5~(fa(PsO} z=hU&92pS8$N2Dm0W3u$wlY7r$F0UZ%Kyug3iBrH%AUeWC&p+QFra1GHREkIGa2YPRHZEvzg1W*FyfSo zsP83BtMmN9%ZIMI%7YR!U%Ks1FWxasis~#+bKjV>pnvzVeO^cSk+HtMh``^={m+?8 zrW#^@;ag2jO-@b@n?j6HleeZd85u4Ihd%SPMU>U=c7AHy{>sOg@pEp0irvp_ z4wY_CDy99}VQ$NlJ0N=62%vXV3R-Mi~xpS|BQnwpwx=dmnO zfSbl>_JgWqvvSK8c#sI9HyhutXnVrw_vc)c4& z0wGe+GjPk7PhEQz#u(4-8F?sAvc0M4?;@$ob$%gzv=yLv2Hrop88oF)0V|oUTCc17 zWS5*kN5(Lz;CmKevVKm1a5lDdNqq=b*mX}Fi<|F;ur%$B8uSO?t>M~i^ zle#(%Ft=W`$X?vxFY?s9vm9onOixdbR%!oM!3;tse=O52JhntHSZ3xpwEI_HSdAU; zo_?y>eZ=tZN?@7gy4i%VaoK8ov1gp=gNdP8vVz?qgCBmek7n*;c+a2<-^Yf!yo(CQ zc-V$fknq6_k;Owia&4}~7kbpgRhjrdvWiP?6!cM8*3ita$jomKbNWSWjw0YI;^;C) zEPSOQHXLj}VFK)*VhF#Og)+VOR8l1B!fkQwhf>DeG}y!9#FFW88`k@Q-pD zHCc22YL|?+ni4)dW`#B~0$leK1CG{)F}tlQ(8`t*0|cT$r_6sbT8M&0a?4p5SzKOD zo=ickDB$2qXE6U|;eyk*M@3nH2@17XYZPlA!1BP2RR*(SW{%S+z%!;AO=kX^UEPkV z<>8ja-uWKw)I_@{i4~Iq6aWu z0{H0rTc|=#^(>S=nxlsV5c#+oOO-_0tP8hJAVmeFgz8)GaE7zZA25MR)bU{A+K#H3 z`g&4X%#1ulpzE3Rd+iEXB)Vg7fo>f!|Lu3$2iLR#z6}3_#rLlW)YvHeq?a{|(w-(#)=RGy=un4d&|tkL7AR zQ~;3~50UUc7noyV(A0hT@};MzprmSA5CF4}RYbUq_Oee(j8$Yu?drNYHAn%_)}p)k zYcD9041lb(#4_DnS|UXYpwF=`p7b!Cd#vpMd}~nwrdk98rphE(t5@^7N^_BY{0+2O z@*S~y(xyh>r@s8C{@KX|6eRz*I{ClaDhT*`f8Gt~S>84*s-N6$GG5@nkUO!%z|67z z*8>XON+Y`e3TSGg^p)d7A!-68f9#1rhU;+|O}r2d~m+eUEpt*>rq)D=QxTI? z)SCSf5)8jC+ak&I1FEu0csKBQ zLdUO^t!)hFHSqb)&$e$CPE-tt@4I)`BazKJ=}-nXoqLXBW7w_6O(*>y@G{M%yRVpx z$Nhu?<@GiCU7+ANgxz~&RFh*GASxiw3upA;C)4t}7e%6tVXZ}{Px|1?I1Y3k;#LptMrMov)r3MP)(F+we- zOf@jS*^v}c{i7C6oZbc#U%yUc3yET&tL(fDmm24S-=9a#1Hrn`ix)uFAi4L2s3<_C z?RwkyK}iP@xGNU$6T&VFfso_(ldzlB7iP7Wt2Yt;eVeQCN=^Su8dYbMmZbDk!%4n* zr#mJkRztfiuXpM`D4;a?*G_++do1q)nZY1M=xZ$dl{nXnw!Nfp+2hvna@RTf~3U( z5e&ep`&rMtH0#H>YUPAoQ#_yfG51i)V{*_>#rS;S^`}ptW|SCj-?Xx}=I7_HGjL2C zuP|wAwd_rLgCaGj2aZydl;iUOAQ~gQP5D<7Gf9L{cb0zrha04b0CEbc_?uLmCE~Y% z)0TfB5;%I%pZHu0QDO=FC=7K7LTEV$j5e5VfVG6_X^Qj4TK@u#QWCS{wJxO^Q^f& zJw5W3)O3xquH%D(j0_Cg%+X4hWg32Kyy|>}y@Zy4WoaODGSJ%_<#-8tygU8852n`G zc!;e`=XY!>RVj(74&F8ine!LRU|2%+%SlnOqr^ zCvEFYcZQMDOX8Pk0NJ=TRUH`_S#Ti9d>nDPz2_yKkdP1(5&}Ki|H0xbo>f)yJC1A3 zy$Mok;+_3kU%!NL`P-&R=7PT@E`~|WE9(^RCv}A%OxTI292+F?elgDMC?1dXhTV z2!*G|$1R5gVmXG-4a)0LsIAS)Hml0BD;RlkM)bBX?o+|=;|UoJgQx|1y>mB<$xIl)P|+o$$4z{qhqubOioS? zMQO?;XB4F$oL?ao@XZ8Rll=Vr;^MX8Jca5CA6UkQl;_qIprq#}CX(Eb15c){g`DQZ z0YB;Lg8nKcQq<@AV3w2y;1X~Z6v!8i0>~h^1FUypVc{>h5UKG=@;4iSnncS$_(X+q zucs@Z)~Y19w)qS>)pEyJNeh*J>;%S=swp~(3)f^TA1E_vs4?B5~Y1f44f2t z?XfbDyf2%Y;8hywPZ!d^Q7&$vsY$)cdF`X94xjdUUn+kbubQP1kg~HeH8r*QdllEz z@Hsp{_AZ}ecLRRRnMcIfdy6y6P$lXf1FjYo6~!Xy*G(Y+BoRC5J&(>K6{&9OjE4Zd zN%|F!t_uPq6cKr>6c14X94=kRiK0x(HEgL=N-JX{R$*TN~PSNTtqVnP5qgouMG5f-v&Ta11_MfZ`X2kD`73-YNM=j z#zD~YGq8p8FP3BTN7v1@q@+c^etY^iRUOic020&xuQS{K>tXJH72vO8Yn>M{6|c&G PCP7N_YS0Q<(|7*|CV_B< literal 0 HcmV?d00001 diff --git a/mq-demo/images/image-20250519135131274.png b/mq-demo/images/image-20250519135131274.png new file mode 100644 index 0000000000000000000000000000000000000000..7c34c3fcb3264ef51ce0f148b411a39593d4293c GIT binary patch literal 7902 zcma)hXIN8B*KTZ}bmRe4sv&@ifCADxL7H@>6N>Z_I-wV7K?I~oFVb76p$MT!QIX!O zA<}!V0SRaOyw~^Ry}s*Q=jxpSBG)s%MG zmR`O+Z9!eXX@=^4ZU01F;bfzI_VcQF!kMJ^?K{w}P)@12N~x`-pNESyX?p{!7k-D3 zw{Un2V4E~~svSLW|JX{^3EWC$$Zmi@#YE4^K%mbLx}JeRL3aPUAnj=#sUj~Wv$zH( z^b6OcM;dUc$wP816^K>!GXoI_q(N?e4`})Y(hO*F9O6nA6?5vl&>GY zIyvJYcJNk{2_$_tin((D);f!xGx#~mgPbNzrOjk%A|AHGS~c&VuyCV}zWT&WtpvCyZ8 zTGkpCZ1R(bfM+JyUyFFpTH*!8NW`cN6~~QT9UuBqlyATHdo-~*IH~@c=^%^Eq95Nc zcUppK9%IhMxVYk6=rO4*eWkay^Y)$8wu=&e7(?CtvZIPW51gRW{!~eP9A0y6T*_qp z+=ix*x;;IA_S3)3)Ob5O;UY%c5j)nsClIz}=QC!RV(n;+*kN#R za%^RH%7msFzk2JapHJupNYPkxfIMoT46JKO!W6nvTJJVjQu1gvqfSqa-s8?Y*4xJC z*9>d)0w^rHmZS1XwS?_s!n3U|0 z@K7dEGjao;&Mld7MNt^;ak5UOcVqh8W`!z7gSCs)GDST$f?Rygc3&1n+V~uuDdsef z=_r_}YvTzwUV<0@tzwYU&PVU4>1i&ZcJ;#Rq62IlYJAm2VOy3rzu1-utEt@!QHu>Q zWrFo?h-wJC`nj*GCJXq#-3oRgH82Ay`VOY#zR8ajr`@7XenmIz{buQ8$BMk9q|~5! z<3r63JMZKeYo+}R2dwD{kI}tb4K454zzbi%xf@Tk_v(e&?^26Y!U`Q#QO3)_RO)7^ z-IeE!%>6^xF*Xbb-)&*mntS8m=!8>-W;Du4UiQCnTwUVC{Pyn0tTewGS4BmxzM4Hw z&YVPcJ=VliyR4jqTV|;Q`UmdyZEgmd!=^_ctndUc>rD`OU++#}lq)luKH z+HehYcZ9#t#=9BpNKP6L=CP|m7IFVs|IOsDJ%)|CK^MU46)}rJUGh$Tg2V31721IZ z{BrHzC)3wFcdlrkV?V(D0ldbVj)|^UdJo&__c*GB?O|^3!Ag7Q(1hm&EJ{8A*5h@? z_aXb&DXH#WOdPV2wzXF6@6r;JT<$izxPmNk=?9=D`$=d1cFrCgp3%;Y1auY{n@F5_1fBrTC5<2{p0vD zHpRtoVriu-gGF~EW0)fpUw6AKS$5XuF4i8bbB=mmT6#_-NsRdFKn7}!d0|aG6CSXG ziaq_*(o%W61wVBhx@hwLQ;tNhdLch*YHHnX*Eh`bN!hy2LB{Sda-KOmk_-G1jFf=6 z8-{l?N~yi?~0qCHJd`5&mrdXzk~t z)lc;M=K)Y*khI2;jIB0iXuOvzq;K5#o$=0AeLcc4A;PWfRM4e8CfUAf_NV#0H+~0( zNku+Dsp?KY1bl9t$pSS(L5`f|$y+=J7!W)y*Ag&0M&Md@I; zE1~cHZBwgY@LOYJXFj(n8T$aMJj%qLgH4AQ!-%G@N5$jks!Z|sKtV_(-^$8LQr*Hk z$_E86tSP0w%Ex%|lD3-JrZQXyeFPpLc=i%P}qPhS7 z0*lE?6J1PbZOH#_#G=F)pxhpaQd(-N(l)QjV0U--zyPp&?C7u0C((AsL;U~j|2RHA zmY0{`+L`Z+z`}*7>+0(6N52HSHX5<`A5l}^CkvO0pqLL71VL^o#a|DWR#Q{6vU(Cr zL3BNMdRkv8e(#6Ku((*bek(9*(hO**fqCQbdIGCjt|1P=3-c zg!4PaBBo!hkBp4GdCUx2=srB9!0DP04l9{@?qXvF*99FCot&nM|B!N3=UBTo%MbcY zH;o)%Xt+1Wcx>+%W{BmA8`zvXF3DO`;{3y~sGYkrs`I3g$S*CPzBlMM%@aD&Z>MYo z!ZX{3?dk;6w^PfmNgo^l=3Z>5fJZKE@^-lz8BY9<*$ow;wR_7kp|Hj@td;8rOymT$ zeWHpe_Mfy-h=85lGD7ZcPMj;%Yhq;Ev4+$tMp4|p zx_PxK`&q9yrd(z1S&5$EyRXY)AXDnBHU!nMiQiCWgG)lq5IvuCKm1V%BYRHN27;?0 zZYb)W#478dBKfU^I)^T)cA7i=X_*R}90tb%J~82!ga;zAA*2i{q9v;pZI(=%MUlv( za*36JnE4BC?oH&*sfVB4T4R|N5zTPoqxh`1jlJtpYe!@MXr|S-s7RCDkQOeW9WZd) ztUhDcB%)q!Zhns7UKdPB{$&vvdP0_&Zo&)6@sOHvJLe|o{QG}@ZYFQ@r4)hV*hfg)JcqL4kS-2{PBtcXtgMRE?xkv* zuX^mp4QydJ$G}LxcJh#!E3Bn&yKeBbO}7cpmAJw-0?1droAzOc1irn95)({54dRaLuQbt{IUz_S zG~%22`Z(#f;W+o}zd~A1+BPh(>XR#sFsu^TlQ12Zv54=}#^TpYJ9v^NLzO0MPyE1 zg-b)=M^5m;$qUNK;|40~Pxq&a>n=Dn?^i#xFBRiFIuwuFvnNi^{kG?nZZG9=FJ??0 zdbB|!&rAE}C%y+zPHHB1Z0!{li8t1+>30BX$IucXBV6tCAH8zGs=50uj;ARF+OCQ_Aj#U#H{G!?ayPH|H z{JRWvMS}DP1KBcBZq~$2MY9EA3GxijtvS0DWLH@-^vQxS*!AQ@LoA(xTa>86d3Jj} zy$_g|l!hwWxt0&y9o(OWa`sUW0e_m1w1l`;j}F{}rXZH13u5{}q>XWJXdAmLr(YWm z$BSd$$~M`#RF%55>RmE0X6EEGXg*Jjl$8=P(p#(BYrk7o{!YYS`yMdPhf}B(me*A} zUZZ^VdKE)nS?V@IJc3ki9T%BnenQj+X5swVRA4LOr%lgP0q&CiUijRebcGOVXvRf$ z2k87qH4W@YRDTb=H%;@@E$n50v7wB>hE& zgo-@5T@vHs;xaSK$L+>>Zgv3Hm=ZQVtxu5fZ_OHvS6m!nJ?>#~5J!1(gKUYPlYN|MJ^M^r{w(M{5QEyPN^%(m>)exO zLYf5~(fa(PsO} z=hU&92pS8$N2Dm0W3u$wlY7r$F0UZ%Kyug3iBrH%AUeWC&p+QFra1GHREkIGa2YPRHZEvzg1W*FyfSo zsP83BtMmN9%ZIMI%7YR!U%Ks1FWxasis~#+bKjV>pnvzVeO^cSk+HtMh``^={m+?8 zrW#^@;ag2jO-@b@n?j6HleeZd85u4Ihd%SPMU>U=c7AHy{>sOg@pEp0irvp_ z4wY_CDy99}VQ$NlJ0N=62%vXV3R-Mi~xpS|BQnwpwx=dmnO zfSbl>_JgWqvvSK8c#sI9HyhutXnVrw_vc)c4& z0wGe+GjPk7PhEQz#u(4-8F?sAvc0M4?;@$ob$%gzv=yLv2Hrop88oF)0V|oUTCc17 zWS5*kN5(Lz;CmKevVKm1a5lDdNqq=b*mX}Fi<|F;ur%$B8uSO?t>M~i^ zle#(%Ft=W`$X?vxFY?s9vm9onOixdbR%!oM!3;tse=O52JhntHSZ3xpwEI_HSdAU; zo_?y>eZ=tZN?@7gy4i%VaoK8ov1gp=gNdP8vVz?qgCBmek7n*;c+a2<-^Yf!yo(CQ zc-V$fknq6_k;Owia&4}~7kbpgRhjrdvWiP?6!cM8*3ita$jomKbNWSWjw0YI;^;C) zEPSOQHXLj}VFK)*VhF#Og)+VOR8l1B!fkQwhf>DeG}y!9#FFW88`k@Q-pD zHCc22YL|?+ni4)dW`#B~0$leK1CG{)F}tlQ(8`t*0|cT$r_6sbT8M&0a?4p5SzKOD zo=ickDB$2qXE6U|;eyk*M@3nH2@17XYZPlA!1BP2RR*(SW{%S+z%!;AO=kX^UEPkV z<>8ja-uWKw)I_@{i4~Iq6aWu z0{H0rTc|=#^(>S=nxlsV5c#+oOO-_0tP8hJAVmeFgz8)GaE7zZA25MR)bU{A+K#H3 z`g&4X%#1ulpzE3Rd+iEXB)Vg7fo>f!|Lu3$2iLR#z6}3_#rLlW)YvHeq?a{|(w-(#)=RGy=un4d&|tkL7AR zQ~;3~50UUc7noyV(A0hT@};MzprmSA5CF4}RYbUq_Oee(j8$Yu?drNYHAn%_)}p)k zYcD9041lb(#4_DnS|UXYpwF=`p7b!Cd#vpMd}~nwrdk98rphE(t5@^7N^_BY{0+2O z@*S~y(xyh>r@s8C{@KX|6eRz*I{ClaDhT*`f8Gt~S>84*s-N6$GG5@nkUO!%z|67z z*8>XON+Y`e3TSGg^p)d7A!-68f9#1rhU;+|O}r2d~m+eUEpt*>rq)D=QxTI? z)SCSf5)8jC+ak&I1FEu0csKBQ zLdUO^t!)hFHSqb)&$e$CPE-tt@4I)`BazKJ=}-nXoqLXBW7w_6O(*>y@G{M%yRVpx z$Nhu?<@GiCU7+ANgxz~&RFh*GASxiw3upA;C)4t}7e%6tVXZ}{Px|1?I1Y3k;#LptMrMov)r3MP)(F+we- zOf@jS*^v}c{i7C6oZbc#U%yUc3yET&tL(fDmm24S-=9a#1Hrn`ix)uFAi4L2s3<_C z?RwkyK}iP@xGNU$6T&VFfso_(ldzlB7iP7Wt2Yt;eVeQCN=^Su8dYbMmZbDk!%4n* zr#mJkRztfiuXpM`D4;a?*G_++do1q)nZY1M=xZ$dl{nXnw!Nfp+2hvna@RTf~3U( z5e&ep`&rMtH0#H>YUPAoQ#_yfG51i)V{*_>#rS;S^`}ptW|SCj-?Xx}=I7_HGjL2C zuP|wAwd_rLgCaGj2aZydl;iUOAQ~gQP5D<7Gf9L{cb0zrha04b0CEbc_?uLmCE~Y% z)0TfB5;%I%pZHu0QDO=FC=7K7LTEV$j5e5VfVG6_X^Qj4TK@u#QWCS{wJxO^Q^f& zJw5W3)O3xquH%D(j0_Cg%+X4hWg32Kyy|>}y@Zy4WoaODGSJ%_<#-8tygU8852n`G zc!;e`=XY!>RVj(74&F8ine!LRU|2%+%SlnOqr^ zCvEFYcZQMDOX8Pk0NJ=TRUH`_S#Ti9d>nDPz2_yKkdP1(5&}Ki|H0xbo>f)yJC1A3 zy$Mok;+_3kU%!NL`P-&R=7PT@E`~|WE9(^RCv}A%OxTI292+F?elgDMC?1dXhTV z2!*G|$1R5gVmXG-4a)0LsIAS)Hml0BD;RlkM)bBX?o+|=;|UoJgQx|1y>mB<$xIl)P|+o$$4z{qhqubOioS? zMQO?;XB4F$oL?ao@XZ8Rll=Vr;^MX8Jca5CA6UkQl;_qIprq#}CX(Eb15c){g`DQZ z0YB;Lg8nKcQq<@AV3w2y;1X~Z6v!8i0>~h^1FUypVc{>h5UKG=@;4iSnncS$_(X+q zucs@Z)~Y19w)qS>)pEyJNeh*J>;%S=swp~(3)f^TA1E_vs4?B5~Y1f44f2t z?XfbDyf2%Y;8hywPZ!d^Q7&$vsY$)cdF`X94xjdUUn+kbubQP1kg~HeH8r*QdllEz z@Hsp{_AZ}ecLRRRnMcIfdy6y6P$lXf1FjYo6~!Xy*G(Y+BoRC5J&(>K6{&9OjE4Zd zN%|F!t_uPq6cKr>6c14X94=kRiK0x(HEgL=N-JX{R$*TN~PSNTtqVnP5qgouMG5f-v&Ta11_MfZ`X2kD`73-YNM=j z#zD~YGq8p8FP3BTN7v1@q@+c^etY^iRUOic020&xuQS{K>tXJH72vO8Yn>M{6|c&G PCP7N_YS0Q<(|7*|CV_B< literal 0 HcmV?d00001 diff --git a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/domain/RabbitMQMessageListenerConstants.java b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/domain/RabbitMQMessageListenerConstants.java index e88e845..2ef7450 100644 --- a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/domain/RabbitMQMessageListenerConstants.java +++ b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/domain/RabbitMQMessageListenerConstants.java @@ -5,5 +5,6 @@ public class RabbitMQMessageListenerConstants { public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String QUEUE_NAME = "queue.order"; public static final String ROUTING_KEY_DIRECT = "order"; + public static final String ALTERNATE_EXCHANGE_BACKUP = "exchange.test.backup"; } diff --git a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java index f63b4d2..01d0c20 100644 --- a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java +++ b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java @@ -3,61 +3,69 @@ package cn.bunny.mq.mqdemo.mq.listener; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.Exchange; -import org.springframework.amqp.rabbit.annotation.Queue; -import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; +import java.util.concurrent.TimeUnit; -import static cn.bunny.mq.mqdemo.domain.RabbitMQMessageListenerConstants.*; +import static cn.bunny.mq.mqdemo.domain.RabbitMQMessageListenerConstants.QUEUE_NAME; @Component @Slf4j public class MessageListenerOrder { - /* 测试这个,需要注释下main那个 */ - @RabbitListener(bindings = @QueueBinding( - exchange = @Exchange(value = EXCHANGE_DIRECT), - value = @Queue(value = QUEUE_NAME, durable = "true"), - key = ROUTING_KEY_DIRECT - ) - ) - public void processMessage(String dataString, Message message, Channel channel) { - System.out.println("消费端接受消息:" + dataString); - } + // /* 测试这个,需要注释下main那个 */ + // @RabbitListener(bindings = @QueueBinding( + // exchange = @Exchange(value = EXCHANGE_DIRECT), + // value = @Queue(value = QUEUE_NAME, durable = "true"), + // key = ROUTING_KEY_DIRECT, + // arguments = @Argument(name = "alternate-exchange", value = ALTERNATE_EXCHANGE_BACKUP) + // ) + // ) + // public void processMessage(String dataString, Message message, Channel channel) { + // System.out.println("消费端接受消息:" + dataString); + // } + + // /* 如果测试这个需要注释上面那个 */ + // @RabbitListener(queues = {QUEUE_NAME}) + // public void processQueue(String dataString, Message message, Channel channel) throws IOException { + // // 设置deliverTag + // // 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作。 + // // 例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。 + // // 而deliveryTag作为消息的唯一标识就很好的满足了这个需求。 + // long deliveryTag = message.getMessageProperties().getDeliveryTag(); + // + // try { + // // 核心操作 + // System.out.println("消费端 消息内容:" + dataString); + // channel.basicAck(deliveryTag, false); + // + // // 核心操作完成,返回ACK信息 + // } catch (Exception e) { + // // 当前参数是否是重新投递的,为true时重复投递过了,为法拉瑟是第一次投递 + // Boolean redelivered = message.getMessageProperties().getRedelivered(); + // + // // 第三个参数: + // // true:重新放回队列,broker会重新投递这个消息 + // // false:不重新放回队列,broker会丢弃这个消息 + // channel.basicNack(deliveryTag, false, !redelivered); + // + // // 除了 basicNack 外还有 basicReject,其中 basicReject 不能控制是否批量操作 + // channel.basicReject(deliveryTag, true); + // + // // 核心操作失败,返回NACK信息 + // throw new RuntimeException(e); + // } + // } - /* 如果测试这个需要注释上面那个 */ @RabbitListener(queues = {QUEUE_NAME}) - public void processQueue(String dataString, Message message, Channel channel) throws IOException { - // 设置deliverTag - // 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作。 - // 例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。 - // 而deliveryTag作为消息的唯一标识就很好的满足了这个需求。 - long deliveryTag = message.getMessageProperties().getDeliveryTag(); + public void processMessagePrefetch(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("消费者 消息内容:{}", dataString); - try { - // 核心操作 - System.out.println("消费端 消息内容:" + dataString); - channel.basicAck(deliveryTag, false); + TimeUnit.SECONDS.sleep(1); - // 核心操作完成,返回ACK信息 - } catch (Exception e) { - // 当前参数是否是重新投递的,为true时重复投递过了,为法拉瑟是第一次投递 - Boolean redelivered = message.getMessageProperties().getRedelivered(); - - // 第三个参数: - // true:重新放回队列,broker会重新投递这个消息 - // false:不重新放回队列,broker会丢弃这个消息 - channel.basicNack(deliveryTag, false, !redelivered); - - // 除了 basicNack 外还有 basicReject,其中 basicReject 不能控制是否批量操作 - channel.basicReject(deliveryTag, true); - - // 核心操作失败,返回NACK信息 - throw new RuntimeException(e); - } + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } diff --git a/mq-demo/src/main/resources/application.yaml b/mq-demo/src/main/resources/application.yaml index b2cde8c..b3b7798 100644 --- a/mq-demo/src/main/resources/application.yaml +++ b/mq-demo/src/main/resources/application.yaml @@ -18,6 +18,7 @@ spring: listener: simple: acknowledge-mode: manual # 手动处理消息 + prefetch: 5 # 设置每次取回数量,消息条数(非字节或KB) # connection-timeout: 1s # 设置MQ连接超时时间 # template: # retry: diff --git a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java index ccc0454..9315ef0 100644 --- a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java +++ b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java @@ -23,9 +23,6 @@ class MqDemoApplicationTests { String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "你好小球球~~~"); - - Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build(); - rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, JSON.toJSONString(bunny)); } /* 测试失败交换机的情况 */ @@ -33,7 +30,6 @@ class MqDemoApplicationTests { void publishExchangeErrorTest() { String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; - rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "----失败的消息发送----"); Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build(); rabbitTemplate.convertAndSend(exchangeDirect + "~", routingKeyDirect, JSON.toJSONString(bunny)); @@ -45,9 +41,19 @@ class MqDemoApplicationTests { void publishQueueErrorTest() { String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; - rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "----失败的队列发送----"); Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build(); rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect + "~", JSON.toJSONString(bunny)); } + + /* 发送消息,发送多条消息,测试使用 */ + @Test + void buildMessageTest() { + String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; + String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; + + for (int i = 0; i < 100; i++) { + rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "测试消息发送【" + i + "】"); + } + } }