diff --git a/pom.xml b/pom.xml index 760062a..9449c26 100644 --- a/pom.xml +++ b/pom.xml @@ -41,6 +41,10 @@ spring-rabbit-test test + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + diff --git a/src/main/java/com/example/rabbitmq/RabbitMqApplication.java b/src/main/java/com/example/rabbitmq/RabbitMqApplication.java index 02e4f74..2f99631 100644 --- a/src/main/java/com/example/rabbitmq/RabbitMqApplication.java +++ b/src/main/java/com/example/rabbitmq/RabbitMqApplication.java @@ -1,7 +1,10 @@ package com.example.rabbitmq; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; @SpringBootApplication public class RabbitMqApplication { @@ -10,4 +13,8 @@ public class RabbitMqApplication { SpringApplication.run(RabbitMqApplication.class, args); } + @Bean + public MessageConverter messageConverter() { + return new Jackson2JsonMessageConverter(); + } } diff --git a/src/main/java/com/example/rabbitmq/config/CreateObjectQueue.java b/src/main/java/com/example/rabbitmq/config/CreateObjectQueue.java new file mode 100644 index 0000000..5cbda9a --- /dev/null +++ b/src/main/java/com/example/rabbitmq/config/CreateObjectQueue.java @@ -0,0 +1,13 @@ +package com.example.rabbitmq.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration;import org.springframework.amqp.core.*; + + +@Configuration +public class CreateObjectQueue { + @Bean + public Queue ObjectQueue() { + return new Queue("object,queue"); + } +} diff --git a/src/main/java/com/example/rabbitmq/config/DirectConfig.java b/src/main/java/com/example/rabbitmq/config/DirectConfig.java new file mode 100644 index 0000000..91a1530 --- /dev/null +++ b/src/main/java/com/example/rabbitmq/config/DirectConfig.java @@ -0,0 +1,34 @@ +package com.example.rabbitmq.config; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class DirectConfig { + // 创建交换机 + @Bean + public DirectExchange directExchange() { + return new DirectExchange("bunny.direct"); + } + + // 新建队列 + @Bean + public Queue directQueue1() { + return new Queue("direct.queue1"); + } + + // 将交换机添加新的key为red + @Bean + public Binding directBinding(Queue directQueue1,DirectExchange directExchange) { + return BindingBuilder.bind(directQueue1).to(directExchange).with("red"); + } + + @Bean + public Binding binding1(Queue directQueue1, DirectExchange directExchange) { + return BindingBuilder.bind(directQueue1).to(directExchange).with("blue"); + } +} diff --git a/src/main/java/com/example/rabbitmq/config/FanoutConfiguration.java b/src/main/java/com/example/rabbitmq/config/FanoutConfiguration.java new file mode 100644 index 0000000..38dca26 --- /dev/null +++ b/src/main/java/com/example/rabbitmq/config/FanoutConfiguration.java @@ -0,0 +1,45 @@ +package com.example.rabbitmq.config; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class FanoutConfiguration { + // 创建新的交换机---bunny.fanout + @Bean + public FanoutExchange fanoutExchange() { + return new FanoutExchange("bunny.fanout"); + } + + // 创建队列---fanout.queue3 + @Bean + public Queue fanoutQueue() { + // QueueBuilder.durable("fanout.queue3").build(); // 创建队列 + return new Queue("fanout.queue3"); + } + + // 队列绑定到交换机上 + @Bean + public Binding fanoutBinding(Queue fanoutQueue) { + return BindingBuilder.bind(fanoutQueue).to(fanoutExchange()); + } + + // 创建新的队列---fanout.queue4 + @Bean + public Queue fanoutQueue4() { + return new Queue("fanout.queue4"); + } + + // 创建新的队列---direct.queue2 + @Bean + public Queue directQueue2() { + return new Queue("direct.queue2"); + } + + // 将队列 fanout.queue4 绑定到 交换机上 + @Bean + public Binding fanoutBinding4() { + return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange()); + } +} diff --git a/src/main/java/com/example/rabbitmq/listener/MqListener.java b/src/main/java/com/example/rabbitmq/listener/MqListener.java index 844666e..dae333e 100644 --- a/src/main/java/com/example/rabbitmq/listener/MqListener.java +++ b/src/main/java/com/example/rabbitmq/listener/MqListener.java @@ -1,6 +1,10 @@ package com.example.rabbitmq.listener; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @@ -19,6 +23,7 @@ public class MqListener { /** * 监听 work.queue 中队列消息 + * * @param message 消息 */ @RabbitListener(queues = "work.queue") @@ -28,6 +33,7 @@ public class MqListener { /** * 监听 work.queue 中队列消息 + * * @param message 消息 */ @RabbitListener(queues = "work.queue") @@ -37,14 +43,17 @@ public class MqListener { /** * 监听 direct.queue1 中队列消息 + * * @param message 消息 */ @RabbitListener(queues = "direct.queue1") public void listenDirectQueue1(String message) { System.out.println("direct.queue1 收到了 direct.queue1:【消息】" + message); } + /** * 监听 direct.queue2 中队列消息 + * * @param message 消息 */ @RabbitListener(queues = "direct.queue2") @@ -54,18 +63,28 @@ public class MqListener { /** * 接受消息 topic.queue1 + * * @param message 消息 */ - @RabbitListener(queues = "topic.queue1") + @RabbitListener(bindings = @QueueBinding( + value = @Queue(name = "topic.queue1", durable = "true"), + exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT), + key = {"red", "blue"} + )) public void listenTopicQueue1(String message) { System.out.println("topic.queue1 收到了 【消息]" + message); } /** * 接受消息 topic.queue1 + * * @param message 消息 */ - @RabbitListener(queues = "topic.queue2") + @RabbitListener(bindings = @QueueBinding( + value = @Queue(name = "topic.queue2", durable = "true"), + exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT), + key = {"yellow", "red"} + )) public void listenTopicQueue2(String message) { System.out.println("topic.queue2 收到了 【消息]" + message); } diff --git a/src/test/java/com/example/rabbitmq/SendMessageTest.java b/src/test/java/com/example/rabbitmq/SendMessageTest.java new file mode 100644 index 0000000..ffb240e --- /dev/null +++ b/src/test/java/com/example/rabbitmq/SendMessageTest.java @@ -0,0 +1,27 @@ +package com.example.rabbitmq; + +import org.junit.jupiter.api.Test; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; + +@SpringBootTest +public class SendMessageTest { + @Autowired + private RabbitTemplate rabbitTemplate; + + // 声明使用JSON,否咋在消息队列中会有很长一段字符串 + @Test + void testSendObject() { + Map map = new HashMap<>(2); + map.put("name","test"); + map.put("age", 21); + rabbitTemplate.convertAndSend("object,queue", map); + } +}