✨ 消息事务说明;
This commit is contained in:
parent
c5db1c49db
commit
51172bd142
|
@ -717,4 +717,82 @@ public void processMessageDelay(String dataString, Channel channel, Message mess
|
|||
log.info("<延迟消息>----消息本身{}", dataString);
|
||||
log.info("<延迟消息>----当前时间{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
|
||||
}
|
||||
```
|
||||
```
|
||||
|
||||
## 消息事务
|
||||
|
||||
在 RabbitMQ 中使用事务(Transaction)可以确保消息的可靠性,但并不推荐在高并发场景下使用,因为它会带来显著的性能开销。以下是详细分析和替代方案建议:
|
||||
|
||||
**1. RabbitMQ 事务的机制**
|
||||
|
||||
通过 `channel.txSelect()`、`channel.txCommit()`、`channel.txRollback()` 实现:
|
||||
```java
|
||||
channel.txSelect(); // 开启事务
|
||||
try {
|
||||
channel.basicPublish(exchange, routingKey, props, message.getBytes());
|
||||
// 其他业务操作(如数据库写入)
|
||||
channel.txCommit(); // 提交事务
|
||||
} catch (Exception e) {
|
||||
channel.txRollback(); // 回滚事务
|
||||
}
|
||||
```
|
||||
|
||||
#### **存在的问题**
|
||||
| 问题 | 说明 |
|
||||
| ---------------- | ------------------------------------------------------------ |
|
||||
| **性能差** | 事务会同步阻塞信道,吞吐量下降约 2~10 倍(实测数据) |
|
||||
| **伪原子性** | 只能保证消息发送到 Broker,无法保证业务操作(如 DB 更新)的原子性 |
|
||||
| **复杂场景失效** | 分布式系统中,跨服务的事务需依赖 Seata 等方案,MQ 事务无法覆盖 |
|
||||
|
||||
---
|
||||
|
||||
### **2. 生产环境推荐方案**
|
||||
#### **(1)Confirm 模式(轻量级确认)**
|
||||
- **原理**:异步确认消息是否成功到达 Broker。
|
||||
- **配置方式**:
|
||||
```java
|
||||
// 开启 Confirm 模式
|
||||
channel.confirmSelect();
|
||||
|
||||
// 异步监听确认结果
|
||||
channel.addConfirmListener((deliveryTag, multiple) -> {
|
||||
// 消息成功投递
|
||||
}, (deliveryTag, multiple) -> {
|
||||
// 消息投递失败(可重试或记录日志)
|
||||
});
|
||||
```
|
||||
- **优点**:性能接近非事务模式,可靠性高。
|
||||
|
||||
#### **(2)消息补偿 + 幂等设计**
|
||||
- **步骤**:
|
||||
1. 消息表记录发送状态(如 `status: sending/success/fail`)。
|
||||
2. 定时任务补偿失败消息。
|
||||
3. 消费者端做幂等处理(如唯一 ID + 去重表)。
|
||||
- **适用场景**:订单支付、库存扣减等关键业务。
|
||||
|
||||
#### **(3)本地消息表(最终一致性)**
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant App
|
||||
participant DB
|
||||
participant MQ
|
||||
App->>DB: 1. 业务数据+消息记录(同事务)
|
||||
DB-->>App: 成功
|
||||
App->>MQ: 2. 异步发送消息
|
||||
MQ-->>App: ACK
|
||||
App->>DB: 3. 更新消息状态
|
||||
```
|
||||
|
||||
仅在以下情况考虑使用:
|
||||
- **低频操作**:如日均消息量 < 1k。
|
||||
- **强一致性要求**:且无法接受异步补偿的极端场景。
|
||||
- **简单业务**:无嵌套调用(如纯消息发送无 DB 操作)。
|
||||
|
||||
| 模式 | 吞吐量(条/秒) | 延迟 | 可靠性 |
|
||||
| ------------ | --------------- | ---- | ------ |
|
||||
| 事务模式 | 500~1k | 高 | 高 |
|
||||
| Confirm 模式 | 10k~50k | 低 | 高 |
|
||||
| 无确认 | 50k+ | 极低 | 低 |
|
||||
|
||||
## 惰性队列
|
||||
|
||||
|
|
Loading…
Reference in New Issue