From b6b2972e89540ad8295331ff5789633d28ef26f2 Mon Sep 17 00:00:00 2001 From: bunny <1319900154@qq.com> Date: Fri, 10 May 2024 09:56:29 +0800 Subject: [PATCH] =?UTF-8?q?feat(rabbitMq):=20RabbitMQ=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- module/module-rabbitMQ/pom.xml | 5 +++ .../rabbitMQ/config/RabbitMqConfig.java | 38 +++++++++++++++++++ .../rabbitMQ/consumer/DelayConsumer.java | 9 +++++ .../rabbitMQ/consumer/DirectConsumer.java | 24 ++++++++++++ .../rabbitMQ/consumer/ErrorConsumer.java | 9 +++++ .../rabbitMQ/consumer/FanoutConsumer.java | 9 +++++ .../rabbitMQ/consumer/LazyConsumer.java | 9 +++++ .../rabbitMQ/consumer/TopicConsumer.java | 9 +++++ .../rabbitMQ/listener/DelayListener.java | 9 +++++ .../rabbitMQ/listener/DirectListener.java | 30 +++++++++++++++ .../rabbitMQ/listener/ErrorListener.java | 9 +++++ .../rabbitMQ/listener/FanoutListener.java | 9 +++++ .../rabbitMQ/listener/LazyListener.java | 9 +++++ .../rabbitMQ/listener/TopicListener.java | 9 +++++ 14 files changed, 187 insertions(+) create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/config/RabbitMqConfig.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DelayConsumer.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DirectConsumer.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/ErrorConsumer.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/FanoutConsumer.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/LazyConsumer.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/TopicConsumer.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DelayListener.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DirectListener.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/ErrorListener.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/FanoutListener.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/LazyListener.java create mode 100644 module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/TopicListener.java diff --git a/module/module-rabbitMQ/pom.xml b/module/module-rabbitMQ/pom.xml index b2c5090..29884a1 100644 --- a/module/module-rabbitMQ/pom.xml +++ b/module/module-rabbitMQ/pom.xml @@ -26,5 +26,10 @@ org.springframework.amqp spring-rabbit-test + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + 2.16.0-rc1 + diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/config/RabbitMqConfig.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/config/RabbitMqConfig.java new file mode 100644 index 0000000..99abbc7 --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/config/RabbitMqConfig.java @@ -0,0 +1,38 @@ +package cn.bunny.module.rabbitMQ.config; + +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class RabbitMqConfig { + @Autowired + private RabbitTemplate rabbitTemplate; + + @PostConstruct + public void init() { + rabbitTemplate.setReturnsCallback(returned -> { + log.error("触发return callback,"); + log.debug("exchange: {}", returned.getExchange()); + log.debug("routingKey: {}", returned.getRoutingKey()); + log.debug("message: {}", returned.getMessage()); + log.debug("replyCode: {}", returned.getReplyCode()); + log.debug("replyText: {}", returned.getReplyText()); + }); + } + + @Bean + public MessageConverter messageConverter() { + // 1.定义消息转换器 + Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); + // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 + converter.setCreateMessageIds(true); + return converter; + } +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DelayConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DelayConsumer.java new file mode 100644 index 0000000..ac1874d --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DelayConsumer.java @@ -0,0 +1,9 @@ +package cn.bunny.module.rabbitMQ.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class DelayConsumer { +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DirectConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DirectConsumer.java new file mode 100644 index 0000000..6b3bc85 --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/DirectConsumer.java @@ -0,0 +1,24 @@ +package cn.bunny.module.rabbitMQ.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class DirectConsumer { + @Autowired + RabbitTemplate rabbitTemplate; + + /** + * 发送红色消息 + */ + @Test + void testSendDirectRed() throws Exception { + for (int i = 0; i < 1000; i++) { + rabbitTemplate.convertAndSend("bunny.direct", "red", "发送消息:" + i); + } + } +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/ErrorConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/ErrorConsumer.java new file mode 100644 index 0000000..583823e --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/ErrorConsumer.java @@ -0,0 +1,9 @@ +package cn.bunny.module.rabbitMQ.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class ErrorConsumer { +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/FanoutConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/FanoutConsumer.java new file mode 100644 index 0000000..0f65252 --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/FanoutConsumer.java @@ -0,0 +1,9 @@ +package cn.bunny.module.rabbitMQ.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class FanoutConsumer { +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/LazyConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/LazyConsumer.java new file mode 100644 index 0000000..30b451f --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/LazyConsumer.java @@ -0,0 +1,9 @@ +package cn.bunny.module.rabbitMQ.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class LazyConsumer { +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/TopicConsumer.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/TopicConsumer.java new file mode 100644 index 0000000..d6a8de0 --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/consumer/TopicConsumer.java @@ -0,0 +1,9 @@ +package cn.bunny.module.rabbitMQ.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class TopicConsumer { +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DelayListener.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DelayListener.java new file mode 100644 index 0000000..318cd63 --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DelayListener.java @@ -0,0 +1,9 @@ +package cn.bunny.module.rabbitMQ.listener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class DelayListener { +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DirectListener.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DirectListener.java new file mode 100644 index 0000000..aa83490 --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/DirectListener.java @@ -0,0 +1,30 @@ +package cn.bunny.module.rabbitMQ.listener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class DirectListener { + /** + * * 监听者2 + * 创建队列 持久化的、不自动删除 + * 创建交换机 持久化的、不自动删除 + * key包含 red 和 yellow + * + * @param message 接受消息 + */ + @RabbitListener(bindings = @QueueBinding( + value = @Queue(name = "direct.queue2", durable = "true", autoDelete = "false"), + exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"), + key = {"red", "yellow"} + )) + public void listenDirectQueue2(String message) { + System.out.println("消费者2接收到 Direct key 为 {\"red\", \"yellow\"} 消息:【" + message + "】"); + } +} \ No newline at end of file diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/ErrorListener.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/ErrorListener.java new file mode 100644 index 0000000..22d2529 --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/ErrorListener.java @@ -0,0 +1,9 @@ +package cn.bunny.module.rabbitMQ.listener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class ErrorListener { +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/FanoutListener.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/FanoutListener.java new file mode 100644 index 0000000..2d4d0ff --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/FanoutListener.java @@ -0,0 +1,9 @@ +package cn.bunny.module.rabbitMQ.listener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class FanoutListener { +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/LazyListener.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/LazyListener.java new file mode 100644 index 0000000..c682154 --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/LazyListener.java @@ -0,0 +1,9 @@ +package cn.bunny.module.rabbitMQ.listener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class LazyListener { +} diff --git a/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/TopicListener.java b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/TopicListener.java new file mode 100644 index 0000000..9363906 --- /dev/null +++ b/module/module-rabbitMQ/src/main/java/cn/bunny/module/rabbitMQ/listener/TopicListener.java @@ -0,0 +1,9 @@ +package cn.bunny.module.rabbitMQ.listener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class TopicListener { +}