vue-java-tutorials/mq-demo/ReadMe.md

449 lines
15 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# RabbitMQ
## 基本示例
**生产者**
```java
@SpringBootTest
class MqDemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 1. 测试发送消息,示例代码
* 2. 测试成功的情况
*/
@Test
void publishTest() {
String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT;
String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT;
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "你好小球球~~~");
Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build();
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, JSON.toJSONString(bunny));
}
}
```
**消费者**
```java
@Component
@Slf4j
public class MessageListenerOrder {
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = EXCHANGE_DIRECT),
value = @Queue(value = QUEUE_NAME, durable = "true"),
key = ROUTING_KEY_DIRECT
)
)
public void processMessage(String dataString, Message message, Channel channel) {
System.out.println("消费端接受消息:" + dataString);
}
}
```
## 可靠性
### 1、消息没有发送到消息队列
Q消息没有发送到消息队列
A在生产者端进行确认具体操作中我们会分别针对交换机和队列进行确认如果没有成功发送到消息队列服务器上那就可以尝试重新发送。
A为目标交换机指定备份交换机当目标交换机投递失败时把消息投递至备份交换机。
**配置文件**
```yaml
rabbitmq:
host: ${bunny.rabbitmq.host}
port: ${bunny.rabbitmq.port}
username: ${bunny.rabbitmq.username}
password: ${bunny.rabbitmq.password}
virtual-host: ${bunny.rabbitmq.virtual-host}
publisher-confirm-type: correlated # 交换机确认
publisher-returns: true # 队列确认
```
#### 生产者确认
> [!NOTE]
> **@PostConstruct 注解**
> **作用**在Bean依赖注入完成后执行初始化方法构造器之后`afterPropertiesSet()`之前)。
>
> **特点**
> - 方法需**无参**、返回**void**,名称任意
> - 执行顺序:构造器 → 依赖注入 → `@PostConstruct`
> - 若抛出异常Bean创建会失败
>
> **注意**
> - 代理类(如`@Transactional`)中,会在**原始对象**初始化时调用
> - `prototype`作用域的Bean每次创建均会执行
> - 避免耗时操作,推荐轻量级初始化
>
> **替代方案**
> `InitializingBean`接口 或 `@Bean(initMethod="xxx")`
RabbitMQ配置
```java
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RabbitConfiguration implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("============correlationData <回调函数打印> = " + correlationData);
System.out.println("============ack <输出> = " + ack);
System.out.println("============cause <输出> = " + cause);
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// 发送到队列失败才会走这个
log.error("消息主体:{}", returnedMessage.getMessage().getBody());
log.error("应答码:{}", returnedMessage.getReplyCode());
log.error("消息使用的父交换机:{}", returnedMessage.getExchange());
log.error("消息使用的路由键:{}", returnedMessage.getRoutingKey());
}
}
```
**测试失败的情况**
交换机找不到
```java
/* 测试失败交换机的情况 */
@Test
void publishExchangeErrorTest() {
String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT;
String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT;
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "----失败的消息发送----");
Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build();
rabbitTemplate.convertAndSend(exchangeDirect + "~", routingKeyDirect, JSON.toJSONString(bunny));
}
```
队列找不到
```java
/* 测试失败队列的情况 */
@Test
void publishQueueErrorTest() {
String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT;
String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT;
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "----失败的队列发送----");
Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build();
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect + "~", JSON.toJSONString(bunny));
}
```
#### 备份交换机
> [!NOTE]
>
> 创建好的交换机是无法修改的,只能删除重新建立。
创建广播类型的交换机,因为没有路由键,只能通过广播的方式去找。
![image-20250519104914787](./images/image-20250519104914787.png)
创建与备份交换机的队列,交换机是广播的模式,不指定路由键。
![image-20250519105042156](./images/image-20250519105042156.png)
通过指定`Alternate exchange`的交换机进行绑定。第一个填写的不是备份交换机,是投递交换机,之后通过`Alternate exchange`绑定备份的交换机。
![image-20250519105250365](./images/image-20250519105250365.png)
### 2、服务器宕机
Q服务器宕机导致内存的消息丢失
A消息持久化到硬盘上哪怕服务器重启也不会导致消息丢失。
### 3、消费端宕机或抛异常
Q消费端宕机或者抛异常导致消息丢失。
A消费端消费消息成功给服务器返回ACK信息通知把消息恢复成待消费状态。
A消费端消费消息失败给服务器返回NACK信息同时把消息恢复为待消费状态这样就可以再次取回消息重试一次需要消费端接口支持幂等性
> [!NOTE]
>
> 需要引入一个内容deliverTag交付标签。
>
> 每个消息进入队列 时broker都会自动生成一个唯一标识64位整数消息投递时会携带交付标签。
>
> **作用:**消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后Broker需要对对应的消息执行后续操作例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。而deliveryTag作为消息的唯一标识就很好的满足了这个需求。
>
> **问题:**
>
> Q如果交换机是Fanout模式同一个消息广播到了不同队列deliveryTag会重复吗?
>
> A不会deliveryTag在Broker范围内唯一消息复制到各个队列deliverTag各不相同。
>
> **multiple说明**
>
> 1. 为false时单独处理这一条消息正常都是false。
> 2. true批量处理消息。
**配置文件**
```yaml
rabbitmq:
host: ${bunny.rabbitmq.host}
port: ${bunny.rabbitmq.port}
username: ${bunny.rabbitmq.username}
password: ${bunny.rabbitmq.password}
virtual-host: ${bunny.rabbitmq.virtual-host}
# 需要注释下面这两个,不需要这两个,因为要手动确认
# publisher-confirm-type: correlated # 交换机确认
# publisher-returns: true # 队列确认
listener:
simple:
acknowledge-mode: manual # 手动处理消息
```
#### 消费端流程
> [!NOTE]
>
> - `deliverTag`
> - 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后Broker需要对对应的消息执行后续操作。
> - 例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。
> - 而deliveryTag作为消息的唯一标识就很好的满足了这个需求。
> - `basicReject`和`basicNack`区别:
> - `basicNack`可以设置是否批量操作,如果需要批量操作,第二个参数传入`true`为批量,反之。
> - `basicReject`只能做到批量操作。
```java
@RabbitListener(queues = {QUEUE_NAME})
public void processQueue(String dataString, Message message, Channel channel) throws IOException {
// 设置 deliverTag
// 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后Broker需要对对应的消息执行后续操作。
// 例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。
// 而deliveryTag作为消息的唯一标识就很好的满足了这个需求。
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 核心操作
System.out.println("消费端 消息内容:" + dataString);
channel.basicAck(deliveryTag, false);
// 核心操作完成返回ACK信息
} catch (Exception e) {
// 当前参数是否是重新投递的为true时重复投递过了为法拉瑟是第一次投递
Boolean redelivered = message.getMessageProperties().getRedelivered();
// 第三个参数:
// true重新放回队列broker会重新投递这个消息
// false不重新放回队列broker会丢弃这个消息
channel.basicNack(deliveryTag, false, !redelivered);
// 除了 basicNack 外还有 basicReject其中 basicReject 不能控制是否批量操作
channel.basicReject(deliveryTag, true);
// 核心操作失败返回NACK信息
throw new RuntimeException(e);
}
}
```
## 消费端限流
### 设置方式
在配置文件中设置`prefetch`值。如果不设置当生产者将消息放置到RabbitMQ中时是一次性取回的无论有多少。
设置了`prefetch`之后,每次取回数量就是`prefetch`的数量。
> [!NOTE]
>
> 并且在UI界面中`Unacked`值和我们设置的值一致。
>
> ![image-20250519135056806](./images/image-20250519135056806.png)
>
> *图中表示表示当前有5条消息已被消费者获取但未确认正在处理中*
>
> 当`prefetch=5`且消费速度为1条/秒时:
>
> - 初始会立即获取5条消息Unacked=5
> - 每ACK 1条后Broker会立即推送1条新消息动态保持Unacked≈5
```yaml
rabbitmq:
host: ${bunny.rabbitmq.host}
port: ${bunny.rabbitmq.port}
username: ${bunny.rabbitmq.username}
password: ${bunny.rabbitmq.password}
virtual-host: ${bunny.rabbitmq.virtual-host}
# publisher-confirm-type: correlated # 交换机确认
# publisher-returns: true # 队列确认
listener:
simple:
acknowledge-mode: manual # 手动处理消息
prefetch: 5 # 设置每次取回数量消息条数非字节或KB
```
> [!IMPORTANT]
> **RabbitMQ Prefetch 机制prefetch=5**
>
> 在 RabbitMQ 的 **prefetchQoS服务质量设置** 机制下,当 `prefetch=5` 时,**消费端的行为** 取决于 **消息确认模式Ack/Nack** 和 **消费速度**
>
> **核心规则**
> - 保持 `unacked` 消息数 **≤ prefetch (5)**
> - **不会** 等5条全部ACK完才发下一批而是 **动态补充**每ACK 1条补发1条
>
> **不同模式对比**
> | 模式 | 行为 |
> | -------------------------------------- | --------------------------------------------- |
> | **手动ACK** (`AcknowledgeMode.MANUAL`) | ✔️ 推荐!保持 `unacked ≤ 5`ACK后立即补新消息 |
> | **自动ACK** (`AcknowledgeMode.AUTO`) | ⚠️ 无效消息投递后立即ACKprefetch无法限流 |
>
> >*自动ACK模式下**prefetch仍然有效**限制未处理的消息数但消息会在投递后立即被ACK实际可能失去限流意义。*
>
> **消费慢时的表现**
>
> - 若消费速度=1条/秒RabbitMQ会 **持续补消息**,始终维持 `unacked ≈ 5`
>
### 测试方式
生产者生产一定数量的消息。
```java
/* 发送消息,发送多条消息,测试使用 */
@Test
void buildMessageTest() {
String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT;
String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT;
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "测试消息发送【" + i + "】");
}
}
```
消费者进行消费消息,在消费的时候为了方便观察,每秒去读一个。
```java
@RabbitListener(queues = {QUEUE_NAME})
public void processMessagePrefetch(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
log.info("消费者 消息内容:{}", dataString);
TimeUnit.SECONDS.sleep(1);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
```
## 消息超时
- 给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除,从两个层面来给消息设定过期时间。
- **队列层面:**在队列层面设定消息过期时间,并不是队列的过期时间。意思是这个队列中的消息全部使用同一个过期时间。
- **消息本身:**给具体的某个消息设定过期时间。
- 可通过两种方式设置消息TTLTime-To-Live那么哪个时间短哪个生效。
### 测试环境
测试时候不要用消费端消费(监听),监听取走了就不是超时了。
**创建交换机**
![image-20250519141236957](./images/image-20250519141236957.png)
**创建队列**
> [!NOTE]
> 过期时间单位为毫秒,如`5000`表示5秒。
创建交换机。直接点击下面的加粗字体可以直接设置。
**队列TTL设置步骤**
创建队列时在`Arguments`中添加:`x-message-ttl`: 设置值如5000
![image-20250519135919549](./images/image-20250519135919549.png)
**绑定交换机**
![image-20250519141220594](./images/image-20250519141220594.png)
### 测试队列层面
当在队列中设置了过期时间,超时后自动删除。
**测试Code**
```java
/* 测试过期时间 */
@Test
void buildExchangeTimeoutTest() {
String EXCHANGE = "exchange.test.timeout";
String QUEUE = "queue.test.timeout";
String ROUTING_KEY = "routing.key.test.timeout";
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "测试消息超时时间【" + i + "】");
}
}
```
**测试效果**
![image-20250519141706027](./images/image-20250519141706027.png)
### 测试消息层面
> [!TIP]
>
> 和上面代码对比,区别在于,第四个参数是否设置了`MessagePostProcessor`。
>
> **队列TTL vs 消息TTL**
>
> - 队列TTL统一管理适合批量消息
> - 消息TTL灵活控制适合特殊消息
> [!IMPORTANT]
>
> **TTL过期后消息直接删除**(非进入死信队列,除非配置了`x-dead-letter-exchange`
```java
/* 测试过期时间---消息层面实现消息超时自动删除 */
@Test
void buildExchangeTimeoutTest2() {
String EXCHANGE = "exchange.test.timeout";
String QUEUE = "queue.test.timeout";
String ROUTING_KEY = "routing.key.test.timeout";
MessagePostProcessor postProcessor = message -> {
// 设置过期时间
message.getMessageProperties().setExpiration("7000");
return message;
};
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "消息层面超时自动删除【" + i + "】", postProcessor);
}
}
```