✨ 优先级队列完成;
This commit is contained in:
parent
fed632f16e
commit
25c582093d
|
@ -848,4 +848,86 @@ sequenceDiagram
|
|||
Map<String, Object> args = new HashMap<>();
|
||||
args.put("x-queue-mode", "lazy");
|
||||
channel.queueDeclare("myLazyQueue", true, false, false, args);
|
||||
```
|
||||
|
||||
## 优先级队列
|
||||
|
||||
**优先级范围**
|
||||
|
||||
- 通过 `x-max-priority` 参数定义队列支持的最大优先级(默认0=无优先级)
|
||||
- 有效值:`1-255`(建议不超过10,因[官方文档](https://www.rabbitmq.com/priority.html)指出高值会显著影响性能)
|
||||
|
||||
**消息排序规则**
|
||||
|
||||
- 高优先级消息**优先被消费**(同优先级则遵循FIFO)
|
||||
- 仅当队列堆积时生效(空队列时无意义)
|
||||
|
||||
### 测试环境
|
||||
|
||||
**创建交换机**
|
||||
|
||||
exchange.test.priority
|
||||
|
||||

|
||||
|
||||
**创建队列**
|
||||
|
||||
队列名称:queue.test.priority
|
||||
|
||||
| 参数 | 值 | 说明 |
|
||||
| :--------------- | :--- | :------------------------------ |
|
||||
| `x-max-priority` | 10 | 必须手动添加(UI/代码均可设置) |
|
||||
|
||||

|
||||
|
||||
**绑定路由**
|
||||
|
||||
routing.key.test.priority
|
||||
|
||||

|
||||
|
||||
### 测试Code
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
A[生产者设置priority] --> B{消息入队}
|
||||
B -->|高优先级| C[插入队列头部附近]
|
||||
B -->|低优先级| D[插入队列尾部附近]
|
||||
```
|
||||
|
||||
> [!IMPORTANT]
|
||||
>
|
||||
> 设置优先级,不能超过设置的优先级,例如设置最高优先级 x-max-priority: 10
|
||||
>
|
||||
> 最高不能超过10
|
||||
|
||||
**生产者**
|
||||
|
||||
```java
|
||||
/* 测试消息优先级 */
|
||||
@Test
|
||||
void priorityTest() {
|
||||
for (int i = 0; i <= 10; i++) {
|
||||
int finalI = i;
|
||||
rabbitTemplate.convertAndSend("exchange.test.priority",
|
||||
"routing.key.test.priority",
|
||||
"优先级消息-" + i,
|
||||
message -> {
|
||||
// 设置优先级,不能超过设置的优先级,例如设置最高优先级 x-max-priority: 10
|
||||
// 最高不能超过10
|
||||
message.getMessageProperties().setPriority(finalI);
|
||||
return message;
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**消费者**
|
||||
|
||||
```java
|
||||
/* 测试优先级队列 */
|
||||
@RabbitListener(queues = "queue.test.priority")
|
||||
public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
|
||||
log.info("<<优先级队列>>----<priority>{}", dataString);
|
||||
}
|
||||
```
|
Binary file not shown.
After Width: | Height: | Size: 14 KiB |
Binary file not shown.
After Width: | Height: | Size: 19 KiB |
Binary file not shown.
After Width: | Height: | Size: 13 KiB |
|
@ -88,4 +88,10 @@ public class MessageListenerOrder {
|
|||
log.info("<延迟消息>----当前时间{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
|
||||
}
|
||||
|
||||
/* 测试优先级队列 */
|
||||
@RabbitListener(queues = "queue.test.priority")
|
||||
public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
|
||||
log.info("<<优先级队列>>----<priority>{}", dataString);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -117,4 +117,21 @@ class MqDemoApplicationTests {
|
|||
messagePostProcessor
|
||||
);
|
||||
}
|
||||
|
||||
/* 测试消息优先级 */
|
||||
@Test
|
||||
void priorityTest() {
|
||||
for (int i = 0; i <= 10; i++) {
|
||||
int finalI = i;
|
||||
rabbitTemplate.convertAndSend("exchange.test.priority",
|
||||
"routing.key.test.priority",
|
||||
"优先级消息-" + i,
|
||||
message -> {
|
||||
// 设置优先级,不能超过设置的优先级,例如设置最高优先级 x-max-priority: 10
|
||||
// 最高不能超过10
|
||||
message.getMessageProperties().setPriority(finalI);
|
||||
return message;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue