diff --git a/ReadMe.md b/ReadMe.md index 02a16a6..d063947 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -1,4 +1,5 @@ # 教程和Demo相关 1. Drools -2. vue的v-model绑定 \ No newline at end of file +2. vue的v-model绑定 +3. RabbitMQ示例 \ No newline at end of file diff --git a/mq-demo/ReadMe.md b/mq-demo/ReadMe.md new file mode 100644 index 0000000..2959af9 --- /dev/null +++ b/mq-demo/ReadMe.md @@ -0,0 +1,169 @@ +# RabbitMQ + +## 基本示例 + +**生产者** + +```java +@SpringBootTest +class MqDemoApplicationTests { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * 1. 测试发送消息,示例代码 + * 2. 测试成功的情况 + */ + @Test + void publishTest() { + String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; + String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; + rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "你好小球球~~~"); + + Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build(); + rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, JSON.toJSONString(bunny)); + } + +} +``` + +**消费者** + +```java +@Component +@Slf4j +public class MessageListenerOrder { + + @RabbitListener(bindings = @QueueBinding( + exchange = @Exchange(value = EXCHANGE_DIRECT), + value = @Queue(value = QUEUE_NAME, durable = "true"), + key = ROUTING_KEY_DIRECT + ) + ) + public void processMessage(String dataString, Message message, Channel channel) { + System.out.println("消费端接受消息:" + dataString); + } + +} +``` + +## 可靠性 + +在Rabbi中队列和交换机都是持久化的,自动删除都是False。 + +### 生产者确认 + +**配置文件** + +```yaml +rabbitmq: + host: ${bunny.rabbitmq.host} + port: ${bunny.rabbitmq.port} + username: ${bunny.rabbitmq.username} + password: ${bunny.rabbitmq.password} + virtual-host: ${bunny.rabbitmq.virtual-host} + publisher-confirm-type: correlated # 交换机确认 + publisher-returns: true # 队列确认 +``` + +#### RabbitMQ配置 + +> [!NOTE] +> **@PostConstruct 注解** +> **作用**:在Bean依赖注入完成后执行初始化方法(构造器之后,`afterPropertiesSet()`之前)。 +> +> **特点**: +> - 方法需**无参**、返回**void**,名称任意 +> - 执行顺序:构造器 → 依赖注入 → `@PostConstruct` +> - 若抛出异常,Bean创建会失败 +> +> **注意**: +> - 代理类(如`@Transactional`)中,会在**原始对象**初始化时调用 +> - `prototype`作用域的Bean每次创建均会执行 +> - 避免耗时操作,推荐轻量级初始化 +> +> **替代方案**: +> `InitializingBean`接口 或 `@Bean(initMethod="xxx")` + +```java +@Slf4j +@Configuration +@RequiredArgsConstructor +public class RabbitConfiguration implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { + + private final RabbitTemplate rabbitTemplate; + + @PostConstruct + public void initRabbitTemplate() { + rabbitTemplate.setConfirmCallback(this); + rabbitTemplate.setReturnsCallback(this); + } + + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + System.out.println("============correlationData <回调函数打印> = " + correlationData); + System.out.println("============ack <输出> = " + ack); + System.out.println("============cause <输出> = " + cause); + } + + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + // 发送到队列失败才会走这个 + log.error("消息主体:{}", returnedMessage.getMessage().getBody()); + log.error("应答码:{}", returnedMessage.getReplyCode()); + log.error("消息使用的父交换机:{}", returnedMessage.getExchange()); + log.error("消息使用的路由键:{}", returnedMessage.getRoutingKey()); + } +} +``` + +**测试失败的情况** + +交换机找不到 + +```java +/* 测试失败交换机的情况 */ +@Test +void publishExchangeErrorTest() { + String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; + String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; + rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "----失败的消息发送----"); + + Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build(); + rabbitTemplate.convertAndSend(exchangeDirect + "~", routingKeyDirect, JSON.toJSONString(bunny)); +} +``` + +队列找不到 + +```java +/* 测试失败队列的情况 */ +@Test +void publishQueueErrorTest() { + String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT; + String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT; + rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "----失败的队列发送----"); + + Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build(); + rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect + "~", JSON.toJSONString(bunny)); +} +``` + +#### 备份交换机 + +> [!NOTE] +> +> 创建好的交换机是无法修改的,只能删除重新建立。 + +创建广播类型的交换机,因为没有路由键,只能通过广播的方式去找。 + +![image-20250519104914787](./images/image-20250519104914787.png) + +创建与备份交换机的队列,交换机是广播的模式,不指定路由键。 + +![image-20250519105042156](./images/image-20250519105042156.png) + +通过指定`Alternate exchange`的交换机进行绑定。第一个填写的不是备份交换机,是投递交换机,之后通过`Alternate exchange`绑定备份的交换机。 + +![image-20250519105250365](./images/image-20250519105250365.png) \ No newline at end of file diff --git a/mq-demo/images/image-20250519104914787.png b/mq-demo/images/image-20250519104914787.png new file mode 100644 index 0000000..be283cb Binary files /dev/null and b/mq-demo/images/image-20250519104914787.png differ diff --git a/mq-demo/images/image-20250519105042156.png b/mq-demo/images/image-20250519105042156.png new file mode 100644 index 0000000..0a5b828 Binary files /dev/null and b/mq-demo/images/image-20250519105042156.png differ diff --git a/mq-demo/images/image-20250519105250365.png b/mq-demo/images/image-20250519105250365.png new file mode 100644 index 0000000..5abaec3 Binary files /dev/null and b/mq-demo/images/image-20250519105250365.png differ diff --git a/mq-demo/push.sh b/mq-demo/push.sh deleted file mode 100644 index 7b9a8ff..0000000 --- a/mq-demo/push.sh +++ /dev/null @@ -1,5 +0,0 @@ -git checkout master -git merge dev -git push --all -git push --tags -git checkout dev diff --git a/mq-demo/src/main/resources/application.yaml b/mq-demo/src/main/resources/application.yaml index 604d28c..b2cde8c 100644 --- a/mq-demo/src/main/resources/application.yaml +++ b/mq-demo/src/main/resources/application.yaml @@ -13,11 +13,11 @@ spring: username: ${bunny.rabbitmq.username} password: ${bunny.rabbitmq.password} virtual-host: ${bunny.rabbitmq.virtual-host} - publisher-confirm-type: correlated # 交换机确认 - publisher-returns: true # 队列确认 - # listener: - # simple: - # acknowledge-mode: manual # 手动处理消息 + # publisher-confirm-type: correlated # 交换机确认 + # publisher-returns: true # 队列确认 + listener: + simple: + acknowledge-mode: manual # 手动处理消息 # connection-timeout: 1s # 设置MQ连接超时时间 # template: # retry: diff --git a/mq-demo/src/main/resources/templates/index.html b/mq-demo/src/main/resources/templates/index.html index 2b089e4..a81542e 100644 --- a/mq-demo/src/main/resources/templates/index.html +++ b/mq-demo/src/main/resources/templates/index.html @@ -147,7 +147,8 @@

通过实践项目学习消息队列的核心概念与应用场景

探索项目 - Gitee代码
@@ -291,60 +292,46 @@
-

生产者配置

+

生产者示例

-
@Configuration
-public class RabbitMQConfig {
+                    

+@Autowired
+private RabbitTemplate rabbitTemplate;
 
-    @Bean
-    public Queue helloQueue() {
-        return new Queue("hello", true);
-    }
+/* 测试发送消息 */
+@Test
+void publishTest() {
+    String exchangeDirect = RabbitMQMessageListenerConstants.EXCHANGE_DIRECT;
+    String routingKeyDirect = RabbitMQMessageListenerConstants.ROUTING_KEY_DIRECT;
+    rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, "你好小球球~~~");
 
-    @Bean
-    public DirectExchange directExchange() {
-        return new DirectExchange("direct.exchange");
-    }
-
-    @Bean
-    public Binding binding(Queue helloQueue,
-                         DirectExchange directExchange) {
-        return BindingBuilder.bind(helloQueue)
-               .to(directExchange)
-               .with("routing.key");
-    }
+    Bunny bunny = Bunny.builder().rabbitName("Bunny").age(2).build();
+    rabbitTemplate.convertAndSend(exchangeDirect, routingKeyDirect, JSON.toJSONString(bunny));
+}
 }

消费者示例

-
@Component
-@RabbitListener(queues = "hello")
-public class MessageConsumer {
-
-    private static final Logger logger =
-        LoggerFactory.getLogger(MessageConsumer.class);
-
-    @RabbitHandler
-    public void receive(String message,
-                      Channel channel,
-                      @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
-        try {
-            logger.info("Received message: {}", message);
-            // 业务处理逻辑
-            channel.basicAck(tag, false);
-        } catch (Exception e) {
-            channel.basicNack(tag, false, true);
-        }
-    }
-}
+

+@RabbitListener(bindings = @QueueBinding(
+        exchange = @Exchange(value = EXCHANGE_DIRECT),
+        value = @Queue(value = QUEUE_NAME, durable = "true"),
+        key = ROUTING_KEY_DIRECT
+)
+)
+public void processMessage(String dataString, Message message, Channel channel) {
+    System.out.println("消费端接受消息:" + dataString);
+}
+                    
- 查看完整代码