From 51172bd1420229a63b546e541e54057cff7ab2ff Mon Sep 17 00:00:00 2001 From: bunny <1319900154@qq.com> Date: Mon, 19 May 2025 17:04:20 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E6=B6=88=E6=81=AF=E4=BA=8B?= =?UTF-8?q?=E5=8A=A1=E8=AF=B4=E6=98=8E;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mq-demo/ReadMe.md | 80 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/mq-demo/ReadMe.md b/mq-demo/ReadMe.md index d183fa9..455e81d 100644 --- a/mq-demo/ReadMe.md +++ b/mq-demo/ReadMe.md @@ -717,4 +717,82 @@ public void processMessageDelay(String dataString, Channel channel, Message mess log.info("<延迟消息>----消息本身{}", dataString); log.info("<延迟消息>----当前时间{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } -``` \ No newline at end of file +``` + +## 消息事务 + +在 RabbitMQ 中使用事务(Transaction)可以确保消息的可靠性,但并不推荐在高并发场景下使用,因为它会带来显著的性能开销。以下是详细分析和替代方案建议: + +**1. RabbitMQ 事务的机制** + +通过 `channel.txSelect()`、`channel.txCommit()`、`channel.txRollback()` 实现: +```java +channel.txSelect(); // 开启事务 +try { + channel.basicPublish(exchange, routingKey, props, message.getBytes()); + // 其他业务操作(如数据库写入) + channel.txCommit(); // 提交事务 +} catch (Exception e) { + channel.txRollback(); // 回滚事务 +} +``` + +#### **存在的问题** +| 问题 | 说明 | +| ---------------- | ------------------------------------------------------------ | +| **性能差** | 事务会同步阻塞信道,吞吐量下降约 2~10 倍(实测数据) | +| **伪原子性** | 只能保证消息发送到 Broker,无法保证业务操作(如 DB 更新)的原子性 | +| **复杂场景失效** | 分布式系统中,跨服务的事务需依赖 Seata 等方案,MQ 事务无法覆盖 | + +--- + +### **2. 生产环境推荐方案** +#### **(1)Confirm 模式(轻量级确认)** +- **原理**:异步确认消息是否成功到达 Broker。 +- **配置方式**: + ```java + // 开启 Confirm 模式 + channel.confirmSelect(); + + // 异步监听确认结果 + channel.addConfirmListener((deliveryTag, multiple) -> { + // 消息成功投递 + }, (deliveryTag, multiple) -> { + // 消息投递失败(可重试或记录日志) + }); + ``` +- **优点**:性能接近非事务模式,可靠性高。 + +#### **(2)消息补偿 + 幂等设计** +- **步骤**: + 1. 消息表记录发送状态(如 `status: sending/success/fail`)。 + 2. 定时任务补偿失败消息。 + 3. 消费者端做幂等处理(如唯一 ID + 去重表)。 +- **适用场景**:订单支付、库存扣减等关键业务。 + +#### **(3)本地消息表(最终一致性)** +```mermaid +sequenceDiagram + participant App + participant DB + participant MQ + App->>DB: 1. 业务数据+消息记录(同事务) + DB-->>App: 成功 + App->>MQ: 2. 异步发送消息 + MQ-->>App: ACK + App->>DB: 3. 更新消息状态 +``` + +仅在以下情况考虑使用: +- **低频操作**:如日均消息量 < 1k。 +- **强一致性要求**:且无法接受异步补偿的极端场景。 +- **简单业务**:无嵌套调用(如纯消息发送无 DB 操作)。 + +| 模式 | 吞吐量(条/秒) | 延迟 | 可靠性 | +| ------------ | --------------- | ---- | ------ | +| 事务模式 | 500~1k | 高 | 高 | +| Confirm 模式 | 10k~50k | 低 | 高 | +| 无确认 | 50k+ | 极低 | 低 | + +## 惰性队列 +