消费端限流完成

This commit is contained in:
bunny 2025-05-19 14:04:17 +08:00
parent d2649488a1
commit c4cfca50a4
7 changed files with 156 additions and 47 deletions

View File

@ -216,6 +216,7 @@ rabbitmq:
username: ${bunny.rabbitmq.username}
password: ${bunny.rabbitmq.password}
virtual-host: ${bunny.rabbitmq.virtual-host}
# 需要注释下面这两个,不需要这两个,因为要手动确认
# publisher-confirm-type: correlated # 交换机确认
# publisher-returns: true # 队列确认
listener:
@ -266,4 +267,96 @@ public void processQueue(String dataString, Message message, Channel channel) th
throw new RuntimeException(e);
}
}
```
```
## 消费端限流
### 设置方式
在配置文件中设置`prefetch`值。如果不设置当生产者将消息放置到RabbitMQ中时是一次性取回的无论有多少。
设置了`prefetch`之后,每次取回数量就是`prefetch`的数量。
> [!NOTE]
>
> 并且在UI界面中`Unacked`值和我们设置的值一致。
>
> ![image-20250519135056806](./images/image-20250519135056806.png)
>
> *图中表示表示当前有5条消息已被消费者获取但未确认正在处理中*
>
> 当`prefetch=5`且消费速度为1条/秒时:
>
> - 初始会立即获取5条消息Unacked=5
> - 每ACK 1条后Broker会立即推送1条新消息动态保持Unacked≈5
```yaml
rabbitmq:
host: ${bunny.rabbitmq.host}
port: ${bunny.rabbitmq.port}
username: ${bunny.rabbitmq.username}
password: ${bunny.rabbitmq.password}
virtual-host: ${bunny.rabbitmq.virtual-host}
# publisher-confirm-type: correlated # 交换机确认
# publisher-returns: true # 队列确认
listener:
simple:
acknowledge-mode: manual # 手动处理消息
prefetch: 5 # 设置每次取回数量消息条数非字节或KB
```
> [!IMPORTANT]
> **RabbitMQ Prefetch 机制prefetch=5**
>
> 在 RabbitMQ 的 **prefetchQoS服务质量设置** 机制下,当 `prefetch=5` 时,**消费端的行为** 取决于 **消息确认模式Ack/Nack** 和 **消费速度**
>
> **核心规则**
> - 保持 `unacked` 消息数 **≤ prefetch (5)**
> - **不会** 等5条全部ACK完才发下一批而是 **动态补充**每ACK 1条补发1条
>
> **不同模式对比**
> | 模式 | 行为 |
> | -------------------------------------- | --------------------------------------------- |
> | **手动ACK** (`AcknowledgeMode.MANUAL`) | ✔️ 推荐!保持 `unacked ≤ 5`ACK后立即补新消息 |
> | **自动ACK** (`AcknowledgeMode.AUTO`) | ⚠️ 无效消息投递后立即ACKprefetch无法限流 |
>
> >自动ACK模式下**prefetch仍然有效**限制未处理的消息数但消息会在投递后立即被ACK实际可能失去限流意义。
>
> **消费慢时的表现**
>
> - 若消费速度=1条/秒RabbitMQ会 **持续补消息**,始终维持 `unacked ≈ 5`
>
### 测试方式
生产者生产一定数量的消息。
```java
/* 发送消息,发送多条消息,测试使用 */
@Test
void buildMessageTest() {
String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT;
String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT;
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "测试消息发送【" + i + "】");
}
}
```
消费者进行消费消息,在消费的时候为了方便观察,每秒去读一个。
```java
@RabbitListener(queues = {QUEUE_NAME})
public void processMessagePrefetch(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
log.info("消费者 消息内容:{}", dataString);
TimeUnit.SECONDS.sleep(1);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
```
## 消息超时
![image-20250519135919549](./images/image-20250519135919549.png)

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.7 KiB

View File

@ -5,5 +5,6 @@ public class RabbitMQMessageListenerConstants {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String QUEUE_NAME = "queue.order";
public static final String ROUTING_KEY_DIRECT = "order";
public static final String ALTERNATE_EXCHANGE_BACKUP = "exchange.test.backup";
}

View File

@ -3,61 +3,69 @@ package cn.bunny.mq.mqdemo.mq.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static cn.bunny.mq.mqdemo.domain.RabbitMQMessageListenerConstants.*;
import static cn.bunny.mq.mqdemo.domain.RabbitMQMessageListenerConstants.QUEUE_NAME;
@Component
@Slf4j
public class MessageListenerOrder {
/* 测试这个需要注释下main那个 */
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = EXCHANGE_DIRECT),
value = @Queue(value = QUEUE_NAME, durable = "true"),
key = ROUTING_KEY_DIRECT
)
)
public void processMessage(String dataString, Message message, Channel channel) {
System.out.println("消费端接受消息:" + dataString);
}
// /* 测试这个需要注释下main那个 */
// @RabbitListener(bindings = @QueueBinding(
// exchange = @Exchange(value = EXCHANGE_DIRECT),
// value = @Queue(value = QUEUE_NAME, durable = "true"),
// key = ROUTING_KEY_DIRECT,
// arguments = @Argument(name = "alternate-exchange", value = ALTERNATE_EXCHANGE_BACKUP)
// )
// )
// public void processMessage(String dataString, Message message, Channel channel) {
// System.out.println("消费端接受消息:" + dataString);
// }
// /* 如果测试这个需要注释上面那个 */
// @RabbitListener(queues = {QUEUE_NAME})
// public void processQueue(String dataString, Message message, Channel channel) throws IOException {
// // 设置deliverTag
// // 消费端把消息处理结果ACKNACKReject等返回给Broker之后Broker需要对对应的消息执行后续操作
// // 例如删除消息重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条
// // 而deliveryTag作为消息的唯一标识就很好的满足了这个需求
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
//
// try {
// // 核心操作
// System.out.println("消费端 消息内容:" + dataString);
// channel.basicAck(deliveryTag, false);
//
// // 核心操作完成返回ACK信息
// } catch (Exception e) {
// // 当前参数是否是重新投递的为true时重复投递过了为法拉瑟是第一次投递
// Boolean redelivered = message.getMessageProperties().getRedelivered();
//
// // 第三个参数
// // true重新放回队列broker会重新投递这个消息
// // false不重新放回队列broker会丢弃这个消息
// channel.basicNack(deliveryTag, false, !redelivered);
//
// // 除了 basicNack 外还有 basicReject其中 basicReject 不能控制是否批量操作
// channel.basicReject(deliveryTag, true);
//
// // 核心操作失败返回NACK信息
// throw new RuntimeException(e);
// }
// }
/* 如果测试这个需要注释上面那个 */
@RabbitListener(queues = {QUEUE_NAME})
public void processQueue(String dataString, Message message, Channel channel) throws IOException {
// 设置deliverTag
// 消费端把消息处理结果ACKNACKReject等返回给Broker之后Broker需要对对应的消息执行后续操作
// 例如删除消息重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条
// 而deliveryTag作为消息的唯一标识就很好的满足了这个需求
long deliveryTag = message.getMessageProperties().getDeliveryTag();
public void processMessagePrefetch(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
log.info("消费者 消息内容:{}", dataString);
try {
// 核心操作
System.out.println("消费端 消息内容:" + dataString);
channel.basicAck(deliveryTag, false);
TimeUnit.SECONDS.sleep(1);
// 核心操作完成返回ACK信息
} catch (Exception e) {
// 当前参数是否是重新投递的为true时重复投递过了为法拉瑟是第一次投递
Boolean redelivered = message.getMessageProperties().getRedelivered();
// 第三个参数
// true重新放回队列broker会重新投递这个消息
// false不重新放回队列broker会丢弃这个消息
channel.basicNack(deliveryTag, false, !redelivered);
// 除了 basicNack 外还有 basicReject其中 basicReject 不能控制是否批量操作
channel.basicReject(deliveryTag, true);
// 核心操作失败返回NACK信息
throw new RuntimeException(e);
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

View File

@ -18,6 +18,7 @@ spring:
listener:
simple:
acknowledge-mode: manual # 手动处理消息
prefetch: 5 # 设置每次取回数量消息条数非字节或KB
# connection-timeout: 1s # 设置MQ连接超时时间
# template:
# retry:

View File

@ -23,9 +23,6 @@ class MqDemoApplicationTests {
String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT;
String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT;
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "你好小球球~~~");
Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build();
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, JSON.toJSONString(bunny));
}
/* 测试失败交换机的情况 */
@ -33,7 +30,6 @@ class MqDemoApplicationTests {
void publishExchangeErrorTest() {
String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT;
String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT;
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "----失败的消息发送----");
Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build();
rabbitTemplate.convertAndSend(exchangeDirect + "~", routingKeyDirect, JSON.toJSONString(bunny));
@ -45,9 +41,19 @@ class MqDemoApplicationTests {
void publishQueueErrorTest() {
String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT;
String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT;
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "----失败的队列发送----");
Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build();
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect + "~", JSON.toJSONString(bunny));
}
/* 发送消息,发送多条消息,测试使用 */
@Test
void buildMessageTest() {
String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT;
String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT;
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "测试消息发送【" + i + "");
}
}
}