diff --git a/mq-demo/ReadMe.md b/mq-demo/ReadMe.md index 14aa493..51f792c 100644 --- a/mq-demo/ReadMe.md +++ b/mq-demo/ReadMe.md @@ -216,6 +216,7 @@ rabbitmq: username: ${bunny.rabbitmq.username} password: ${bunny.rabbitmq.password} virtual-host: ${bunny.rabbitmq.virtual-host} + # 需要注释下面这两个,不需要这两个,因为要手动确认 # publisher-confirm-type: correlated # 交换机确认 # publisher-returns: true # 队列确认 listener: @@ -266,4 +267,96 @@ public void processQueue(String dataString, Message message, Channel channel) th throw new RuntimeException(e); } } -``` \ No newline at end of file +``` + +## 消费端限流 + +### 设置方式 + +在配置文件中设置`prefetch`值。如果不设置,当生产者将消息放置到RabbitMQ中时,是一次性取回的,无论有多少。 + +设置了`prefetch`之后,每次取回数量就是`prefetch`的数量。 + +> [!NOTE] +> +> 并且在UI界面中`Unacked`值和我们设置的值一致。 +> +> ![image-20250519135056806](./images/image-20250519135056806.png) +> +> *图中表示表示当前有5条消息已被消费者获取但未确认(正在处理中)* +> +> 当`prefetch=5`且消费速度为1条/秒时: +> +> - 初始会立即获取5条消息(Unacked=5) +> - 每ACK 1条后,Broker会立即推送1条新消息(动态保持Unacked≈5) + +```yaml + rabbitmq: + host: ${bunny.rabbitmq.host} + port: ${bunny.rabbitmq.port} + username: ${bunny.rabbitmq.username} + password: ${bunny.rabbitmq.password} + virtual-host: ${bunny.rabbitmq.virtual-host} + # publisher-confirm-type: correlated # 交换机确认 + # publisher-returns: true # 队列确认 + listener: + simple: + acknowledge-mode: manual # 手动处理消息 + prefetch: 5 # 设置每次取回数量,消息条数(非字节或KB) +``` + +> [!IMPORTANT] +> **RabbitMQ Prefetch 机制(prefetch=5)** +> +> 在 RabbitMQ 的 **prefetch(QoS,服务质量设置)** 机制下,当 `prefetch=5` 时,**消费端的行为** 取决于 **消息确认模式(Ack/Nack)** 和 **消费速度** +> +> **核心规则**: +> - 保持 `unacked` 消息数 **≤ prefetch (5)** +> - **不会** 等5条全部ACK完才发下一批,而是 **动态补充**(每ACK 1条,补发1条) +> +> **不同模式对比**: +> | 模式 | 行为 | +> | -------------------------------------- | --------------------------------------------- | +> | **手动ACK** (`AcknowledgeMode.MANUAL`) | ✔️ 推荐!保持 `unacked ≤ 5`,ACK后立即补新消息 | +> | **自动ACK** (`AcknowledgeMode.AUTO`) | ⚠️ 无效!消息投递后立即ACK,prefetch无法限流 | +> +> >自动ACK模式下**prefetch仍然有效**(限制未处理的消息数),但消息会在投递后立即被ACK,实际可能失去限流意义。 +> +> **消费慢时的表现**: +> +> - 若消费速度=1条/秒,RabbitMQ会 **持续补消息**,始终维持 `unacked ≈ 5` +> + +### 测试方式 + +生产者生产一定数量的消息。 + +```java +/* 发送消息,发送多条消息,测试使用 */ +@Test +void buildMessageTest() { + String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; + String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; + + for (int i = 0; i < 100; i++) { + rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "测试消息发送【" + i + "】"); + } +} +``` + +消费者进行消费消息,在消费的时候为了方便观察,每秒去读一个。 + +```java +@RabbitListener(queues = {QUEUE_NAME}) +public void processMessagePrefetch(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("消费者 消息内容:{}", dataString); + + TimeUnit.SECONDS.sleep(1); + + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); +} +``` + +## 消息超时 + +![image-20250519135919549](./images/image-20250519135919549.png) \ No newline at end of file diff --git a/mq-demo/images/image-20250519135056806.png b/mq-demo/images/image-20250519135056806.png new file mode 100644 index 0000000..7c34c3f Binary files /dev/null and b/mq-demo/images/image-20250519135056806.png differ diff --git a/mq-demo/images/image-20250519135131274.png b/mq-demo/images/image-20250519135131274.png new file mode 100644 index 0000000..7c34c3f Binary files /dev/null and b/mq-demo/images/image-20250519135131274.png differ diff --git a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/domain/RabbitMQMessageListenerConstants.java b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/domain/RabbitMQMessageListenerConstants.java index e88e845..2ef7450 100644 --- a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/domain/RabbitMQMessageListenerConstants.java +++ b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/domain/RabbitMQMessageListenerConstants.java @@ -5,5 +5,6 @@ public class RabbitMQMessageListenerConstants { public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String QUEUE_NAME = "queue.order"; public static final String ROUTING_KEY_DIRECT = "order"; + public static final String ALTERNATE_EXCHANGE_BACKUP = "exchange.test.backup"; } diff --git a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java index f63b4d2..01d0c20 100644 --- a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java +++ b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java @@ -3,61 +3,69 @@ package cn.bunny.mq.mqdemo.mq.listener; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.Exchange; -import org.springframework.amqp.rabbit.annotation.Queue; -import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; +import java.util.concurrent.TimeUnit; -import static cn.bunny.mq.mqdemo.domain.RabbitMQMessageListenerConstants.*; +import static cn.bunny.mq.mqdemo.domain.RabbitMQMessageListenerConstants.QUEUE_NAME; @Component @Slf4j public class MessageListenerOrder { - /* 测试这个,需要注释下main那个 */ - @RabbitListener(bindings = @QueueBinding( - exchange = @Exchange(value = EXCHANGE_DIRECT), - value = @Queue(value = QUEUE_NAME, durable = "true"), - key = ROUTING_KEY_DIRECT - ) - ) - public void processMessage(String dataString, Message message, Channel channel) { - System.out.println("消费端接受消息:" + dataString); - } + // /* 测试这个,需要注释下main那个 */ + // @RabbitListener(bindings = @QueueBinding( + // exchange = @Exchange(value = EXCHANGE_DIRECT), + // value = @Queue(value = QUEUE_NAME, durable = "true"), + // key = ROUTING_KEY_DIRECT, + // arguments = @Argument(name = "alternate-exchange", value = ALTERNATE_EXCHANGE_BACKUP) + // ) + // ) + // public void processMessage(String dataString, Message message, Channel channel) { + // System.out.println("消费端接受消息:" + dataString); + // } + + // /* 如果测试这个需要注释上面那个 */ + // @RabbitListener(queues = {QUEUE_NAME}) + // public void processQueue(String dataString, Message message, Channel channel) throws IOException { + // // 设置deliverTag + // // 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作。 + // // 例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。 + // // 而deliveryTag作为消息的唯一标识就很好的满足了这个需求。 + // long deliveryTag = message.getMessageProperties().getDeliveryTag(); + // + // try { + // // 核心操作 + // System.out.println("消费端 消息内容:" + dataString); + // channel.basicAck(deliveryTag, false); + // + // // 核心操作完成,返回ACK信息 + // } catch (Exception e) { + // // 当前参数是否是重新投递的,为true时重复投递过了,为法拉瑟是第一次投递 + // Boolean redelivered = message.getMessageProperties().getRedelivered(); + // + // // 第三个参数: + // // true:重新放回队列,broker会重新投递这个消息 + // // false:不重新放回队列,broker会丢弃这个消息 + // channel.basicNack(deliveryTag, false, !redelivered); + // + // // 除了 basicNack 外还有 basicReject,其中 basicReject 不能控制是否批量操作 + // channel.basicReject(deliveryTag, true); + // + // // 核心操作失败,返回NACK信息 + // throw new RuntimeException(e); + // } + // } - /* 如果测试这个需要注释上面那个 */ @RabbitListener(queues = {QUEUE_NAME}) - public void processQueue(String dataString, Message message, Channel channel) throws IOException { - // 设置deliverTag - // 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作。 - // 例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。 - // 而deliveryTag作为消息的唯一标识就很好的满足了这个需求。 - long deliveryTag = message.getMessageProperties().getDeliveryTag(); + public void processMessagePrefetch(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("消费者 消息内容:{}", dataString); - try { - // 核心操作 - System.out.println("消费端 消息内容:" + dataString); - channel.basicAck(deliveryTag, false); + TimeUnit.SECONDS.sleep(1); - // 核心操作完成,返回ACK信息 - } catch (Exception e) { - // 当前参数是否是重新投递的,为true时重复投递过了,为法拉瑟是第一次投递 - Boolean redelivered = message.getMessageProperties().getRedelivered(); - - // 第三个参数: - // true:重新放回队列,broker会重新投递这个消息 - // false:不重新放回队列,broker会丢弃这个消息 - channel.basicNack(deliveryTag, false, !redelivered); - - // 除了 basicNack 外还有 basicReject,其中 basicReject 不能控制是否批量操作 - channel.basicReject(deliveryTag, true); - - // 核心操作失败,返回NACK信息 - throw new RuntimeException(e); - } + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } diff --git a/mq-demo/src/main/resources/application.yaml b/mq-demo/src/main/resources/application.yaml index b2cde8c..b3b7798 100644 --- a/mq-demo/src/main/resources/application.yaml +++ b/mq-demo/src/main/resources/application.yaml @@ -18,6 +18,7 @@ spring: listener: simple: acknowledge-mode: manual # 手动处理消息 + prefetch: 5 # 设置每次取回数量,消息条数(非字节或KB) # connection-timeout: 1s # 设置MQ连接超时时间 # template: # retry: diff --git a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java index ccc0454..9315ef0 100644 --- a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java +++ b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/MqDemoApplicationTests.java @@ -23,9 +23,6 @@ class MqDemoApplicationTests { String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "你好小球球~~~"); - - Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build(); - rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, JSON.toJSONString(bunny)); } /* 测试失败交换机的情况 */ @@ -33,7 +30,6 @@ class MqDemoApplicationTests { void publishExchangeErrorTest() { String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; - rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "----失败的消息发送----"); Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build(); rabbitTemplate.convertAndSend(exchangeDirect + "~", routingKeyDirect, JSON.toJSONString(bunny)); @@ -45,9 +41,19 @@ class MqDemoApplicationTests { void publishQueueErrorTest() { String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; - rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "----失败的队列发送----"); Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build(); rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect + "~", JSON.toJSONString(bunny)); } + + /* 发送消息,发送多条消息,测试使用 */ + @Test + void buildMessageTest() { + String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; + String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; + + for (int i = 0; i < 100; i++) { + rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "测试消息发送【" + i + "】"); + } + } }