From d2649488a14645b362f1693b2218d07ae5d67fb8 Mon Sep 17 00:00:00 2001 From: bunny <1319900154@qq.com> Date: Mon, 19 May 2025 13:32:39 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E6=B6=88=E8=B4=B9=E7=AB=AF?= =?UTF-8?q?=E5=AE=95=E6=9C=BA=E6=88=96=E6=8A=9B=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mq-demo/ReadMe.md | 108 +++++++++++++++++- .../mq/listener/MessageListenerOrder.java | 35 ++++++ 2 files changed, 139 insertions(+), 4 deletions(-) diff --git a/mq-demo/ReadMe.md b/mq-demo/ReadMe.md index 2959af9..14aa493 100644 --- a/mq-demo/ReadMe.md +++ b/mq-demo/ReadMe.md @@ -50,9 +50,13 @@ public class MessageListenerOrder { ## 可靠性 -在Rabbi中队列和交换机都是持久化的,自动删除都是False。 +### 1、消息没有发送到消息队列 -### 生产者确认 +Q:消息没有发送到消息队列 + +A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列进行确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送。 + +A:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机。 **配置文件** @@ -67,7 +71,7 @@ rabbitmq: publisher-returns: true # 队列确认 ``` -#### RabbitMQ配置 +#### 生产者确认 > [!NOTE] > **@PostConstruct 注解** @@ -86,6 +90,8 @@ rabbitmq: > **替代方案**: > `InitializingBean`接口 或 `@Bean(initMethod="xxx")` +RabbitMQ配置 + ```java @Slf4j @Configuration @@ -166,4 +172,98 @@ void publishQueueErrorTest() { 通过指定`Alternate exchange`的交换机进行绑定。第一个填写的不是备份交换机,是投递交换机,之后通过`Alternate exchange`绑定备份的交换机。 -![image-20250519105250365](./images/image-20250519105250365.png) \ No newline at end of file +![image-20250519105250365](./images/image-20250519105250365.png) + +### 2、服务器宕机 + +Q:服务器宕机导致内存的消息丢失 + +A:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失。 + +### 3、消费端宕机或抛异常 + +Q:消费端宕机或者抛异常,导致消息丢失。 + +A:消费端消费消息成功,给服务器返回ACK信息,通知把消息恢复成待消费状态。 + +A:消费端消费消息失败,给服务器返回NACK信息,同时把消息恢复为待消费状态,这样就可以再次取回消息,重试一次(需要消费端接口支持幂等性)。 + +> [!NOTE] +> +> 需要引入一个内容:deliverTag,交付标签。 +> +> 每个消息进入队列 时,broker都会自动生成一个唯一标识(64位整数),消息投递时会携带交付标签。 +> +> **作用:**消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作,例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。而deliveryTag作为消息的唯一标识就很好的满足了这个需求。 +> +> **问题:** +> +> Q:如果交换机是Fanout模式,同一个消息广播到了不同队列,deliveryTag会重复吗? +> +> A:不会,deliveryTag在Broker范围内唯一,消息复制到各个队列,deliverTag各不相同。 +> +> **multiple说明** +> +> 1. 为false时单独处理这一条消息;正常都是false。 +> 2. true批量处理消息。 + +**配置文件** + +```yaml +rabbitmq: + host: ${bunny.rabbitmq.host} + port: ${bunny.rabbitmq.port} + username: ${bunny.rabbitmq.username} + password: ${bunny.rabbitmq.password} + virtual-host: ${bunny.rabbitmq.virtual-host} + # publisher-confirm-type: correlated # 交换机确认 + # publisher-returns: true # 队列确认 + listener: + simple: + acknowledge-mode: manual # 手动处理消息 +``` + +#### 消费端流程 + +> [!NOTE] +> +> - `deliverTag` +> - 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作。 +> - 例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。 +> - 而deliveryTag作为消息的唯一标识就很好的满足了这个需求。 +> - `basicReject`和`basicNack`区别: +> - `basicNack`可以设置是否批量操作,如果需要批量操作,第二个参数传入`true`为批量,反之。 +> - `basicReject`只能做到批量操作。 + +```java +@RabbitListener(queues = {QUEUE_NAME}) +public void processQueue(String dataString, Message message, Channel channel) throws IOException { + // 设置 deliverTag + // 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作。 + // 例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。 + // 而deliveryTag作为消息的唯一标识就很好的满足了这个需求。 + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + + try { + // 核心操作 + System.out.println("消费端 消息内容:" + dataString); + channel.basicAck(deliveryTag, false); + + // 核心操作完成,返回ACK信息 + } catch (Exception e) { + // 当前参数是否是重新投递的,为true时重复投递过了,为法拉瑟是第一次投递 + Boolean redelivered = message.getMessageProperties().getRedelivered(); + + // 第三个参数: + // true:重新放回队列,broker会重新投递这个消息 + // false:不重新放回队列,broker会丢弃这个消息 + channel.basicNack(deliveryTag, false, !redelivered); + + // 除了 basicNack 外还有 basicReject,其中 basicReject 不能控制是否批量操作 + channel.basicReject(deliveryTag, true); + + // 核心操作失败,返回NACK信息 + throw new RuntimeException(e); + } +} +``` \ No newline at end of file diff --git a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java index 130e805..f63b4d2 100644 --- a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java +++ b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java @@ -9,12 +9,15 @@ import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; +import java.io.IOException; + import static cn.bunny.mq.mqdemo.domain.RabbitMQMessageListenerConstants.*; @Component @Slf4j public class MessageListenerOrder { + /* 测试这个,需要注释下main那个 */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = EXCHANGE_DIRECT), value = @Queue(value = QUEUE_NAME, durable = "true"), @@ -25,4 +28,36 @@ public class MessageListenerOrder { System.out.println("消费端接受消息:" + dataString); } + /* 如果测试这个需要注释上面那个 */ + @RabbitListener(queues = {QUEUE_NAME}) + public void processQueue(String dataString, Message message, Channel channel) throws IOException { + // 设置deliverTag + // 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作。 + // 例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。 + // 而deliveryTag作为消息的唯一标识就很好的满足了这个需求。 + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + + try { + // 核心操作 + System.out.println("消费端 消息内容:" + dataString); + channel.basicAck(deliveryTag, false); + + // 核心操作完成,返回ACK信息 + } catch (Exception e) { + // 当前参数是否是重新投递的,为true时重复投递过了,为法拉瑟是第一次投递 + Boolean redelivered = message.getMessageProperties().getRedelivered(); + + // 第三个参数: + // true:重新放回队列,broker会重新投递这个消息 + // false:不重新放回队列,broker会丢弃这个消息 + channel.basicNack(deliveryTag, false, !redelivered); + + // 除了 basicNack 外还有 basicReject,其中 basicReject 不能控制是否批量操作 + channel.basicReject(deliveryTag, true); + + // 核心操作失败,返回NACK信息 + throw new RuntimeException(e); + } + } + }