From 85cca2f90efe32b293c4b4387681584a5ead1f2a Mon Sep 17 00:00:00 2001
From: bunny <1319900154@qq.com>
Date: Mon, 9 Sep 2024 15:20:36 +0800
Subject: [PATCH] =?UTF-8?q?feat(=E6=96=B0=E5=A2=9E):=20:rocket:=20?=
=?UTF-8?q?=E6=B7=BB=E5=8A=A0rabbitMQ=E7=A4=BA=E4=BE=8B?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../rabbitMQ/config/RabbitMqConfig.java | 54 ++++++++++--------
.../rabbitMQ/consumer/DelayConsumer.java | 9 ---
.../rabbitMQ/consumer/DirectConsumer.java | 24 --------
.../rabbitMQ/consumer/ErrorConsumer.java | 9 ---
.../rabbitMQ/consumer/FanoutConsumer.java | 9 ---
.../rabbitMQ/consumer/LazyConsumer.java | 9 ---
.../rabbitMQ/consumer/TopicConsumer.java | 9 ---
.../rabbitMQ/listener/DirectListener.java | 55 +++++++++++++------
service/pom.xml | 7 +++
.../cn/bunny/service/mq/PublishDirect.java | 12 ++++
.../src/main/resources/application-dev.yml | 19 +++++--
.../src/main/resources/application-prod.yml | 19 +++++--
service/src/main/resources/application.yml | 26 ++++++++-
.../bunny/service/mq/PublishDirectTest.java | 33 +++++++++++
14 files changed, 171 insertions(+), 123 deletions(-)
delete mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DelayConsumer.java
delete mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DirectConsumer.java
delete mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/ErrorConsumer.java
delete mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/FanoutConsumer.java
delete mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/LazyConsumer.java
delete mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/TopicConsumer.java
create mode 100644 service/src/main/java/cn/bunny/service/mq/PublishDirect.java
create mode 100644 service/src/test/java/cn/bunny/service/mq/PublishDirectTest.java
diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/config/RabbitMqConfig.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/config/RabbitMqConfig.java
index 99abbc7..f033ced 100644
--- a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/config/RabbitMqConfig.java
+++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/config/RabbitMqConfig.java
@@ -2,37 +2,47 @@ package cn.bunny.module.rabbitMQ.config;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.ReturnedMessage;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
-import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
-import org.springframework.stereotype.Component;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.lang.NonNull;
-@Component
+@Configuration
@Slf4j
-public class RabbitMqConfig {
+public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
- public void init() {
- rabbitTemplate.setReturnsCallback(returned -> {
- log.error("触发return callback,");
- log.debug("exchange: {}", returned.getExchange());
- log.debug("routingKey: {}", returned.getRoutingKey());
- log.debug("message: {}", returned.getMessage());
- log.debug("replyCode: {}", returned.getReplyCode());
- log.debug("replyText: {}", returned.getReplyText());
- });
+ public void initRabbitTemplate() {
+ rabbitTemplate.setConfirmCallback(this);
+ rabbitTemplate.setReturnsCallback(this);
}
- @Bean
- public MessageConverter messageConverter() {
- // 1.定义消息转换器
- Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
- // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
- converter.setCreateMessageIds(true);
- return converter;
+ @Override
+ public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+ log.info("confirm()同调函数打印 CorrelationData:{}", correlationData);
+ log.info("confirm()回调函数打印 ack:{}", ack);
+ log.info("confirm()回调函数打印 cause:{}", cause);
}
+
+ @Override
+ public void returnedMessage(@NonNull ReturnedMessage returned) {
+ log.info("returnedMessage()回调函数 消息主体:{}", new String(returned.getMessage().getBody()));
+ log.info("returnedMessage()回调函数 应答码:{}", returned.getReplyCode());
+ log.info("returnedMessage()回调函数 描述:{}", returned.getReplyText());
+ log.info("returnedMessage()回调函数 消息使用的交换器 exchange :{}", returned.getExchange());
+ log.info("returnedMessage()回调函数 消息使用的路由键 routing :{}", returned.getRoutingKey());
+ }
+
+ // @Bean
+ // public MessageConverter messageConverter() {
+ // // 1.定义消息转换器
+ // Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
+ // // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
+ // converter.setCreateMessageIds(true);
+ // return converter;
+ // }
}
diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DelayConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DelayConsumer.java
deleted file mode 100644
index ac1874d..0000000
--- a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DelayConsumer.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package cn.bunny.module.rabbitMQ.consumer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class DelayConsumer {
-}
diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DirectConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DirectConsumer.java
deleted file mode 100644
index 6b3bc85..0000000
--- a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DirectConsumer.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package cn.bunny.module.rabbitMQ.consumer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.Test;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class DirectConsumer {
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- /**
- * 发送红色消息
- */
- @Test
- void testSendDirectRed() throws Exception {
- for (int i = 0; i < 1000; i++) {
- rabbitTemplate.convertAndSend("bunny.direct", "red", "发送消息:" + i);
- }
- }
-}
diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/ErrorConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/ErrorConsumer.java
deleted file mode 100644
index 583823e..0000000
--- a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/ErrorConsumer.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package cn.bunny.module.rabbitMQ.consumer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class ErrorConsumer {
-}
diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/FanoutConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/FanoutConsumer.java
deleted file mode 100644
index 0f65252..0000000
--- a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/FanoutConsumer.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package cn.bunny.module.rabbitMQ.consumer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class FanoutConsumer {
-}
diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/LazyConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/LazyConsumer.java
deleted file mode 100644
index 30b451f..0000000
--- a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/LazyConsumer.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package cn.bunny.module.rabbitMQ.consumer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class LazyConsumer {
-}
diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/TopicConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/TopicConsumer.java
deleted file mode 100644
index d6a8de0..0000000
--- a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/TopicConsumer.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package cn.bunny.module.rabbitMQ.consumer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class TopicConsumer {
-}
diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DirectListener.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DirectListener.java
index aa83490..3b8b17e 100644
--- a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DirectListener.java
+++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DirectListener.java
@@ -1,30 +1,51 @@
package cn.bunny.module.rabbitMQ.listener;
+import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
-import org.springframework.amqp.rabbit.annotation.Exchange;
-import org.springframework.amqp.rabbit.annotation.Queue;
-import org.springframework.amqp.rabbit.annotation.QueueBinding;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
+import java.io.IOException;
+
@Component
@Slf4j
public class DirectListener {
- /**
- * * 监听者2
- * 创建队列 持久化的、不自动删除
- * 创建交换机 持久化的、不自动删除
- * key包含 red 和 yellow
- *
- * @param message 接受消息
- */
@RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2", durable = "true", autoDelete = "false"),
- exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),
- key = {"red", "yellow"}
+ exchange = @Exchange(name = "exchange.direct.order"),
+ key = {"order"},
+ value = @Queue(name = "queue.direct.order", durable = "true", autoDelete = "false"),
+ arguments = {
+ //
+ @Argument(name = "alternate-exchange", value = "exchange.fanout.order.backup"),
+ // 惰性队列
+ @Argument(name = "x-queue-mode", value = "lazy")
+ }
))
- public void listenDirectQueue2(String message) {
- System.out.println("消费者2接收到 Direct key 为 {\"red\", \"yellow\"} 消息:【" + message + "】");
+ public void listenDirectQueue2(String dataString, Message message, Channel channel) throws IOException {
+ long deliveryTag = message.getMessageProperties().getDeliveryTag();
+
+ try {
+ System.out.println("消费者2接收到 Direct key 为 order 消息:【" + dataString + "】");
+ channel.basicAck(deliveryTag, false);
+ } catch (IOException e) {
+ // 是否之前投递过
+ Boolean redelivered = message.getMessageProperties().getRedelivered();
+ System.out.println("是否投递过:" + redelivered);
+ if (!redelivered) {
+ // 是否重复投递 :true
+ channel.basicNack(deliveryTag, false, true);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @RabbitListener(bindings = @QueueBinding(
+ exchange = @Exchange(name = "exchange.fanout.order.backup", type = ExchangeTypes.FANOUT),
+ value = @Queue(name = "queue.order.backup", durable = "true", autoDelete = "false")
+ ))
+ public void listenBackup(String message) {
+ log.info("备份交换机:{}", message);
}
}
\ No newline at end of file
diff --git a/service/pom.xml b/service/pom.xml
index a3371a5..c619113 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -25,6 +25,13 @@
service-utils
0.0.1-SNAPSHOT
+
+
+ cn.bunny
+ module-rabbitMQ
+ 0.0.1-SNAPSHOT
+
+
cn.bunny
spring-security
diff --git a/service/src/main/java/cn/bunny/service/mq/PublishDirect.java b/service/src/main/java/cn/bunny/service/mq/PublishDirect.java
new file mode 100644
index 0000000..849cbe0
--- /dev/null
+++ b/service/src/main/java/cn/bunny/service/mq/PublishDirect.java
@@ -0,0 +1,12 @@
+package cn.bunny.service.mq;
+
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class PublishDirect {
+ @Autowired
+ private RabbitTemplate rabbitTemplate;
+
+}
diff --git a/service/src/main/resources/application-dev.yml b/service/src/main/resources/application-dev.yml
index bd3361d..849f519 100644
--- a/service/src/main/resources/application-dev.yml
+++ b/service/src/main/resources/application-dev.yml
@@ -1,19 +1,26 @@
bunny:
datasource:
- host: 192.168.3.100
- port: 3305
+ host: 192.168.3.98
+ port: 3306
sqlData: bunny_docs
username: root
- password: "02120212"
+ password: "123456"
redis:
- host: 192.168.3.100
+ host: 192.168.3.98
port: 6379
database: 0
password: "123456"
minio:
- endpointUrl: "http://192.168.3.100:9000"
+ endpointUrl: "http://192.168.3.98:9000"
bucket-name: test
accessKey: bunny
- secretKey: "02120212"
\ No newline at end of file
+ secretKey: "02120212"
+
+ rabbitmq:
+ host: 192.168.3.98
+ port: 5672
+ username: admin
+ password: "admin"
+ virtual-host: /
\ No newline at end of file
diff --git a/service/src/main/resources/application-prod.yml b/service/src/main/resources/application-prod.yml
index de6c595..1b421d3 100644
--- a/service/src/main/resources/application-prod.yml
+++ b/service/src/main/resources/application-prod.yml
@@ -2,23 +2,30 @@
knife4j:
enable: true
production: true
-
+
bunny:
datasource:
- host: 192.168.3.100
+ host: 192.168.3.98
port: 3306
sqlData: bunny_docs
username: root
- password: "02120212"
+ password: "123456"
redis:
- host: 192.168.3.100
+ host: 192.168.3.98
port: 6379
database: 0
password: "123456"
minio:
- endpointUrl: "http://192.168.3.100:9000"
+ endpointUrl: "http://192.168.3.98:9000"
bucket-name: test
accessKey: bunny
- secretKey: "02120212"
\ No newline at end of file
+ secretKey: "02120212"
+
+ rabbitmq:
+ host: 192.168.3.98
+ port: 5672
+ username: admin
+ password: "admin"
+ virtual-host: /
\ No newline at end of file
diff --git a/service/src/main/resources/application.yml b/service/src/main/resources/application.yml
index 22ee819..076e9bc 100644
--- a/service/src/main/resources/application.yml
+++ b/service/src/main/resources/application.yml
@@ -44,19 +44,39 @@ spring:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
+ rabbitmq:
+ host: ${bunny.rabbitmq.host}
+ port: ${bunny.rabbitmq.port}
+ username: ${bunny.rabbitmq.username}
+ password: ${bunny.rabbitmq.password}
+ virtual-host: ${bunny.rabbitmq.virtual-host}
+# publisher-confirm-type: correlated # 交换机确认
+# publisher-returns: true # 队列确认
+# listener:
+# simple:
+# acknowledge-mode: manual # 手动处理消息
+# connection-timeout: 1s # 设置MQ连接超时时间
+# template:
+# retry:
+# enabled: true # 失败重试
+# initial-interval: 1000ms # 失败后初始时间
+# multiplier: 1 # 失败后下次等待时长倍数 initial-interval * multiplier
+# max-attempts: 3 # 最大重试次数
+
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
global-config:
db-config:
logic-delete-field: isDelete
- configuration:
- log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 查看日志
+# configuration:
+# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 查看日志
logging:
level:
- cn.bunny.service.mapper: debug
+ cn.bunny.service.mapper: info
cn.bunny.service.controller: info
cn.bunny.service.service: info
+ root: info
pattern:
dateformat: HH:mm:ss:SSS
file:
diff --git a/service/src/test/java/cn/bunny/service/mq/PublishDirectTest.java b/service/src/test/java/cn/bunny/service/mq/PublishDirectTest.java
new file mode 100644
index 0000000..8d0718e
--- /dev/null
+++ b/service/src/test/java/cn/bunny/service/mq/PublishDirectTest.java
@@ -0,0 +1,33 @@
+package cn.bunny.service.mq;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageBuilder;
+import org.springframework.amqp.core.MessageDeliveryMode;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.nio.charset.StandardCharsets;
+
+@SpringBootTest
+class PublishDirectTest {
+ @Autowired
+ private RabbitTemplate rabbitTemplate;
+
+ @Test
+ void publishDirect() throws Exception {
+ rabbitTemplate.convertAndSend("exchange.direct.order", "order", "消息。。。");
+ }
+
+ @Test
+ void publishDirect2() {
+ Message message = MessageBuilder
+ .withBody("hello".getBytes(StandardCharsets.UTF_8))
+ .setReceivedRoutingKey("order")
+ .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
+ for (int i = 0; i < 1000000; i++) {
+ rabbitTemplate.convertAndSend("exchange.direct.order", "order", "消息。。。");
+ }
+ }
+}
\ No newline at end of file