diff --git a/mq-demo/ReadMe.md b/mq-demo/ReadMe.md index fa85bfc..b53ff4c 100644 --- a/mq-demo/ReadMe.md +++ b/mq-demo/ReadMe.md @@ -848,4 +848,86 @@ sequenceDiagram Map args = new HashMap<>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myLazyQueue", true, false, false, args); +``` + +## 优先级队列 + +**优先级范围** + +- 通过 `x-max-priority` 参数定义队列支持的最大优先级(默认0=无优先级) +- 有效值:`1-255`(建议不超过10,因[官方文档](https://www.rabbitmq.com/priority.html)指出高值会显著影响性能) + +**消息排序规则** + +- 高优先级消息**优先被消费**(同优先级则遵循FIFO) +- 仅当队列堆积时生效(空队列时无意义) + +### 测试环境 + +**创建交换机** + +exchange.test.priority + +![image-20250519171927379](./images/image-20250519171927379.png) + +**创建队列** + +队列名称:queue.test.priority + +| 参数 | 值 | 说明 | +| :--------------- | :--- | :------------------------------ | +| `x-max-priority` | 10 | 必须手动添加(UI/代码均可设置) | + +![image-20250519172029093](./images/image-20250519172029093.png) + +**绑定路由** + +routing.key.test.priority + +![image-20250519172210296](./images/image-20250519172210296.png) + +### 测试Code + +```mermaid +graph LR +A[生产者设置priority] --> B{消息入队} +B -->|高优先级| C[插入队列头部附近] +B -->|低优先级| D[插入队列尾部附近] +``` + +> [!IMPORTANT] +> +> 设置优先级,不能超过设置的优先级,例如设置最高优先级 x-max-priority: 10 +> +> 最高不能超过10 + +**生产者** + +```java +/* 测试消息优先级 */ +@Test +void priorityTest() { + for (int i = 0; i <= 10; i++) { + int finalI = i; + rabbitTemplate.convertAndSend("exchange.test.priority", + "routing.key.test.priority", + "优先级消息-" + i, + message -> { + // 设置优先级,不能超过设置的优先级,例如设置最高优先级 x-max-priority: 10 + // 最高不能超过10 + message.getMessageProperties().setPriority(finalI); + return message; + }); + } +} +``` + +**消费者** + +```java +/* 测试优先级队列 */ +@RabbitListener(queues = "queue.test.priority") +public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("<<优先级队列>>----{}", dataString); +} ``` \ No newline at end of file diff --git a/mq-demo/images/image-20250519171927379.png b/mq-demo/images/image-20250519171927379.png new file mode 100644 index 0000000..b0c2e08 Binary files /dev/null and b/mq-demo/images/image-20250519171927379.png differ diff --git a/mq-demo/images/image-20250519172029093.png b/mq-demo/images/image-20250519172029093.png new file mode 100644 index 0000000..97ed0a9 Binary files /dev/null and b/mq-demo/images/image-20250519172029093.png differ diff --git a/mq-demo/images/image-20250519172210296.png b/mq-demo/images/image-20250519172210296.png new file mode 100644 index 0000000..6d0d246 Binary files /dev/null and b/mq-demo/images/image-20250519172210296.png differ diff --git a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java index 30556ab..8738c7b 100644 --- a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java +++ b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java @@ -88,4 +88,10 @@ public class MessageListenerOrder { log.info("<延迟消息>----当前时间{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } + /* 测试优先级队列 */ + @RabbitListener(queues = "queue.test.priority") + public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("<<优先级队列>>----{}", dataString); + } + } diff --git a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java index 0b8711a..e9a2804 100644 --- a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java +++ b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java @@ -117,4 +117,21 @@ class MqDemoApplicationTests { messagePostProcessor ); } + + /* 测试消息优先级 */ + @Test + void priorityTest() { + for (int i = 0; i <= 10; i++) { + int finalI = i; + rabbitTemplate.convertAndSend("exchange.test.priority", + "routing.key.test.priority", + "优先级消息-" + i, + message -> { + // 设置优先级,不能超过设置的优先级,例如设置最高优先级 x-max-priority: 10 + // 最高不能超过10 + message.getMessageProperties().setPriority(finalI); + return message; + }); + } + } }