diff --git a/mq-demo/ReadMe.md b/mq-demo/ReadMe.md index 51f792c..fed4912 100644 --- a/mq-demo/ReadMe.md +++ b/mq-demo/ReadMe.md @@ -320,7 +320,7 @@ public void processQueue(String dataString, Message message, Channel channel) th > | **手动ACK** (`AcknowledgeMode.MANUAL`) | ✔️ 推荐!保持 `unacked ≤ 5`,ACK后立即补新消息 | > | **自动ACK** (`AcknowledgeMode.AUTO`) | ⚠️ 无效!消息投递后立即ACK,prefetch无法限流 | > -> >自动ACK模式下**prefetch仍然有效**(限制未处理的消息数),但消息会在投递后立即被ACK,实际可能失去限流意义。 +> >*自动ACK模式下**prefetch仍然有效**(限制未处理的消息数),但消息会在投递后立即被ACK,实际可能失去限流意义。* > > **消费慢时的表现**: > @@ -359,4 +359,91 @@ public void processMessagePrefetch(String dataString, Channel channel, Message m ## 消息超时 -![image-20250519135919549](./images/image-20250519135919549.png) \ No newline at end of file +- 给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除,从两个层面来给消息设定过期时间。 + - **队列层面:**在队列层面设定消息过期时间,并不是队列的过期时间。意思是这个队列中的消息全部使用同一个过期时间。 + - **消息本身:**给具体的某个消息设定过期时间。 +- 可通过两种方式设置消息TTL(Time-To-Live),那么哪个时间短,哪个生效。 + +### 测试环境 + +测试时候不要用消费端消费(监听),监听取走了就不是超时了。 + +**创建交换机** + +![image-20250519141236957](./images/image-20250519141236957.png) + +**创建队列** + +> [!NOTE] +> 过期时间单位为毫秒,如`5000`表示5秒。 + +创建交换机。直接点击下面的加粗字体可以直接设置。 + +**队列TTL设置步骤**: + +创建队列时在`Arguments`中添加:`x-message-ttl`: 设置值(如5000) + +![image-20250519135919549](./images/image-20250519135919549.png) + +**绑定交换机** + +![image-20250519141220594](./images/image-20250519141220594.png) + +### 测试队列层面 + +当在队列中设置了过期时间,超时后自动删除。 + +**测试Code** + +```java +/* 测试过期时间 */ +@Test +void buildExchangeTimeoutTest() { + String EXCHANGE = "exchange.test.timeout"; + String QUEUE = "queue.test.timeout"; + String ROUTING_KEY = "routing.key.test.timeout"; + + for (int i = 0; i < 100; i++) { + rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "测试消息超时时间【" + i + "】"); + } +} +``` + +**测试效果** + +![image-20250519141706027](./images/image-20250519141706027.png) + +### 测试消息层面 + +> [!TIP] +> +> 和上面代码对比,区别在于,第四个参数是否设置了`MessagePostProcessor`。 +> +> **队列TTL vs 消息TTL**: +> +> - 队列TTL:统一管理,适合批量消息 +> - 消息TTL:灵活控制,适合特殊消息 + +> [!IMPORTANT] +> +> **TTL过期后消息直接删除**(非进入死信队列,除非配置了`x-dead-letter-exchange`) + +```java +/* 测试过期时间---消息层面实现消息超时自动删除 */ +@Test +void buildExchangeTimeoutTest2() { + String EXCHANGE = "exchange.test.timeout"; + String QUEUE = "queue.test.timeout"; + String ROUTING_KEY = "routing.key.test.timeout"; + + MessagePostProcessor postProcessor = message -> { + // 设置过期时间 + message.getMessageProperties().setExpiration("7000"); + return message; + }; + + for (int i = 0; i < 100; i++) { + rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "消息层面超时自动删除【" + i + "】", postProcessor); + } +} +``` \ No newline at end of file diff --git a/mq-demo/images/image-20250519135919549.png b/mq-demo/images/image-20250519135919549.png new file mode 100644 index 0000000..706fa60 Binary files /dev/null and b/mq-demo/images/image-20250519135919549.png differ diff --git a/mq-demo/images/image-20250519141220594.png b/mq-demo/images/image-20250519141220594.png new file mode 100644 index 0000000..3f9a4ba Binary files /dev/null and b/mq-demo/images/image-20250519141220594.png differ diff --git a/mq-demo/images/image-20250519141236957.png b/mq-demo/images/image-20250519141236957.png new file mode 100644 index 0000000..7f0e3f5 Binary files /dev/null and b/mq-demo/images/image-20250519141236957.png differ diff --git a/mq-demo/images/image-20250519141706027.png b/mq-demo/images/image-20250519141706027.png new file mode 100644 index 0000000..9b4b27e Binary files /dev/null and b/mq-demo/images/image-20250519141706027.png differ diff --git a/mq-demo/images/image-20250519141710598.png b/mq-demo/images/image-20250519141710598.png new file mode 100644 index 0000000..9b4b27e Binary files /dev/null and b/mq-demo/images/image-20250519141710598.png differ diff --git a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java index 9315ef0..397c6ef 100644 --- a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java +++ b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java @@ -4,6 +4,7 @@ import cn.bunny.mq.mqdemo.domain.RabbitMQMessageListenerConstants; import cn.bunny.mq.mqdemo.domain.entity.Bunny; import com.alibaba.fastjson2.JSON; import org.junit.jupiter.api.Test; +import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -56,4 +57,32 @@ class MqDemoApplicationTests { rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "测试消息发送【" + i + "】"); } } + + /* 测试过期时间 */ + @Test + void buildExchangeTimeoutTest1() { + String EXCHANGE = "exchange.test.timeout"; + String ROUTING_KEY = "routing.key.test.timeout"; + + for (int i = 0; i < 100; i++) { + rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "测试消息超时时间【" + i + "】"); + } + } + + /* 测试过期时间---消息层面实现消息超时自动删除 */ + @Test + void buildExchangeTimeoutTest2() { + String EXCHANGE = "exchange.test.timeout"; + String ROUTING_KEY = "routing.key.test.timeout"; + + MessagePostProcessor postProcessor = message -> { + // 设置过期时间 + message.getMessageProperties().setExpiration("7000"); + return message; + }; + + for (int i = 0; i < 100; i++) { + rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "消息层面超时自动删除【" + i + "】", postProcessor); + } + } }