diff --git a/mq-demo/ReadMe.md b/mq-demo/ReadMe.md index fed4912..df27b46 100644 --- a/mq-demo/ReadMe.md +++ b/mq-demo/ReadMe.md @@ -446,4 +446,133 @@ void buildExchangeTimeoutTest2() { rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "消息层面超时自动删除【" + i + "】", postProcessor); } } -``` \ No newline at end of file +``` + +## 死信队列 + +> [!NOTE] +> **死信(Dead Letter)**:满足以下任一条件的消息: +> 1. 被消费者拒绝且不重新入队(`requeue=false`) +> 2. 消息在队列中超过TTL时间 +> 3. 队列达到最大长度限制(溢出) + +**概念:**当消息因上述原因无法被正常消费时,会被标记为死信(Dead Letter),并可通过死信交换机(DLX)路由到死信队列。 + +**产生原因:** + +1. **拒绝:**消费者拒绝消息`basicNack`/`basicReject`,并且不把消息重新放入源目标队列,`requeue=false`。 +2. **溢出:**队列中消息数量达到限制。比如队列最大只能存储10条消息,且现在已经存储了10条消息,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信。 +3. **超时:**消息达到超时时间未被消息。 + +**死信处理方式:** + +- **丢弃** + 适用于对业务无影响的非关键消息(需明确丢弃的监控手段)。 +- **入库** + 将死信持久化到数据库,便于后续分析或人工干预。 +- **监听(推荐)** + - 配置独立的死信交换机和队列,与业务逻辑解耦。 + - 消费者专注处理异常消息(如补偿、告警等)。 + +### 测试环境 + +#### 搭建死信环境 + +**创建死信交换机** + +exchange.dead.letter.video + +![image-20250519151418248](./images/image-20250519151418248.png) + +**创建死信队列** + +queue.dead.letter.video + +![image-20250519151430393](./images/image-20250519151430393.png) + +**绑定死信队列** + +routing.key.dead.letter.video + +![image-20250519151451777](./images/image-20250519151451777.png) + +#### 搭建正常环境 + +> [!WARNING] +> 必须为正常队列配置以下参数才能生效: +> - `x-dead-letter-exchange`:指定死信交换机 +> - `x-dead-letter-routing-key`:指定死信路由键 + +**正常交换机** + +exchange.normal.video + +**正常队列** + +queue.normal.video + +这时设置最大长度为10,一次接受10条消息,超出后进入死信。 + +![image-20250519152200270](./images/image-20250519152200270.png) + +**正常路由键** + +routing.key.normal.video + +到正常的队列中指定正常的交换机和正常路由键。 + +![image-20250519152552486](./images/image-20250519152552486.png) + +### 测试Code + +#### 因拒绝产生 + +**含义:** + +- 本来时监听正常队列的,处理一些逻辑,但是因为某些原因消息成死信了,消息转到死信队列中。 +- 这时候,在下面又监听了死信队列,死信队列可以监听到成死信的消息。 + +```java +/* 测试死信---监听正常队列 */ +@RabbitListener(queues = {"queue.normal.video"}) +public void processMessageNormal(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("监听正常队列----接受到:{}", dataString); + channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); +} + +/* 测试死信---监听死信队列 */ +@RabbitListener(queues = {"queue.dead.letter.video"}) +public void processMessageDeadLetter(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("监听死信队列----接收到:{}", dataString); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); +} +``` + +向正常队列发送消息 + +![image-20250519153747525](./images/image-20250519153747525.png) + +#### 因超时或溢出 + +在上设置了最大长度为10,如果发送40条那么有30条是溢出的,这时会进入死信队列中。 + +测试时注释掉监听死信队列和正常队列的代码,之后看UI界面折线图。 + +```java +/* 因超时或移除产生死信 */ +@Test +void buildExchangeOverflowTest() { + String EXCHANGE = "exchange.normal.video"; + String ROUTING_KEY = "routing.key.normal.video"; + + for (int i = 0; i < 40; i++) { + rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "因超时或移除产生死信【" + i + "】"); + } +} +``` + +从图中可以看出,是分多批进行的。 + +在之前设置中,消息最大接受是10,最多只能接收到10条消息,之后溢出消息进入死信,其中有10条消息是因为延迟,进入了死信。 + +![image-20250519154653646](./images/image-20250519154653646.png) \ No newline at end of file diff --git a/mq-demo/images/image-20250519151418248.png b/mq-demo/images/image-20250519151418248.png new file mode 100644 index 0000000..e65e7fb Binary files /dev/null and b/mq-demo/images/image-20250519151418248.png differ diff --git a/mq-demo/images/image-20250519151430393.png b/mq-demo/images/image-20250519151430393.png new file mode 100644 index 0000000..c8a7415 Binary files /dev/null and b/mq-demo/images/image-20250519151430393.png differ diff --git a/mq-demo/images/image-20250519151451777.png b/mq-demo/images/image-20250519151451777.png new file mode 100644 index 0000000..f3612ba Binary files /dev/null and b/mq-demo/images/image-20250519151451777.png differ diff --git a/mq-demo/images/image-20250519152200270.png b/mq-demo/images/image-20250519152200270.png new file mode 100644 index 0000000..e03e24f Binary files /dev/null and b/mq-demo/images/image-20250519152200270.png differ diff --git a/mq-demo/images/image-20250519152552486.png b/mq-demo/images/image-20250519152552486.png new file mode 100644 index 0000000..a1e4ee1 Binary files /dev/null and b/mq-demo/images/image-20250519152552486.png differ diff --git a/mq-demo/images/image-20250519153747525.png b/mq-demo/images/image-20250519153747525.png new file mode 100644 index 0000000..ea64e2f Binary files /dev/null and b/mq-demo/images/image-20250519153747525.png differ diff --git a/mq-demo/images/image-20250519154653646.png b/mq-demo/images/image-20250519154653646.png new file mode 100644 index 0000000..09d2deb Binary files /dev/null and b/mq-demo/images/image-20250519154653646.png differ diff --git a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java index 01d0c20..9bef018 100644 --- a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java +++ b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java @@ -68,4 +68,17 @@ public class MessageListenerOrder { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } + /* *//* 测试死信---监听正常队列 *//* + @RabbitListener(queues = {"queue.normal.video"}) + public void processMessageNormal(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("监听正常队列----接受到:{}", dataString); + channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); + } + + *//* 测试死信---监听死信队列 *//* + @RabbitListener(queues = {"queue.dead.letter.video"}) + public void processMessageDeadLetter(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("监听死信队列----接收到:{}", dataString); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } */ } diff --git a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java index 397c6ef..93a0770 100644 --- a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java +++ b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java @@ -85,4 +85,15 @@ class MqDemoApplicationTests { rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "消息层面超时自动删除【" + i + "】", postProcessor); } } + + /* 因超时或移除产生死信 */ + @Test + void buildExchangeOverflowTest() { + String EXCHANGE = "exchange.normal.video"; + String ROUTING_KEY = "routing.key.normal.video"; + + for (int i = 0; i < 40; i++) { + rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "因超时或移除产生死信【" + i + "】"); + } + } }