手动创建消息队列和交换机以及转JSON
This commit is contained in:
parent
ce115dba6f
commit
6325066759
4
pom.xml
4
pom.xml
|
@ -41,6 +41,10 @@
|
|||
<artifactId>spring-rabbit-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||
<artifactId>jackson-dataformat-yaml</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
package com.example.rabbitmq;
|
||||
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
@SpringBootApplication
|
||||
public class RabbitMqApplication {
|
||||
|
@ -10,4 +13,8 @@ public class RabbitMqApplication {
|
|||
SpringApplication.run(RabbitMqApplication.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MessageConverter messageConverter() {
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
package com.example.rabbitmq.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;import org.springframework.amqp.core.*;
|
||||
|
||||
|
||||
@Configuration
|
||||
public class CreateObjectQueue {
|
||||
@Bean
|
||||
public Queue ObjectQueue() {
|
||||
return new Queue("object,queue");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package com.example.rabbitmq.config;
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class DirectConfig {
|
||||
// 创建交换机
|
||||
@Bean
|
||||
public DirectExchange directExchange() {
|
||||
return new DirectExchange("bunny.direct");
|
||||
}
|
||||
|
||||
// 新建队列
|
||||
@Bean
|
||||
public Queue directQueue1() {
|
||||
return new Queue("direct.queue1");
|
||||
}
|
||||
|
||||
// 将交换机添加新的key为red
|
||||
@Bean
|
||||
public Binding directBinding(Queue directQueue1,DirectExchange directExchange) {
|
||||
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding binding1(Queue directQueue1, DirectExchange directExchange) {
|
||||
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package com.example.rabbitmq.config;
|
||||
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class FanoutConfiguration {
|
||||
// 创建新的交换机---bunny.fanout
|
||||
@Bean
|
||||
public FanoutExchange fanoutExchange() {
|
||||
return new FanoutExchange("bunny.fanout");
|
||||
}
|
||||
|
||||
// 创建队列---fanout.queue3
|
||||
@Bean
|
||||
public Queue fanoutQueue() {
|
||||
// QueueBuilder.durable("fanout.queue3").build(); // 创建队列
|
||||
return new Queue("fanout.queue3");
|
||||
}
|
||||
|
||||
// 队列绑定到交换机上
|
||||
@Bean
|
||||
public Binding fanoutBinding(Queue fanoutQueue) {
|
||||
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange());
|
||||
}
|
||||
|
||||
// 创建新的队列---fanout.queue4
|
||||
@Bean
|
||||
public Queue fanoutQueue4() {
|
||||
return new Queue("fanout.queue4");
|
||||
}
|
||||
|
||||
// 创建新的队列---direct.queue2
|
||||
@Bean
|
||||
public Queue directQueue2() {
|
||||
return new Queue("direct.queue2");
|
||||
}
|
||||
|
||||
// 将队列 fanout.queue4 绑定到 交换机上
|
||||
@Bean
|
||||
public Binding fanoutBinding4() {
|
||||
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
|
||||
}
|
||||
}
|
|
@ -1,6 +1,10 @@
|
|||
package com.example.rabbitmq.listener;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.ExchangeTypes;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
@ -19,6 +23,7 @@ public class MqListener {
|
|||
|
||||
/**
|
||||
* 监听 work.queue 中队列消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
@RabbitListener(queues = "work.queue")
|
||||
|
@ -28,6 +33,7 @@ public class MqListener {
|
|||
|
||||
/**
|
||||
* 监听 work.queue 中队列消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
@RabbitListener(queues = "work.queue")
|
||||
|
@ -37,14 +43,17 @@ public class MqListener {
|
|||
|
||||
/**
|
||||
* 监听 direct.queue1 中队列消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
@RabbitListener(queues = "direct.queue1")
|
||||
public void listenDirectQueue1(String message) {
|
||||
System.out.println("direct.queue1 收到了 direct.queue1:【消息】" + message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听 direct.queue2 中队列消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
@RabbitListener(queues = "direct.queue2")
|
||||
|
@ -54,18 +63,28 @@ public class MqListener {
|
|||
|
||||
/**
|
||||
* 接受消息 topic.queue1
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
@RabbitListener(queues = "topic.queue1")
|
||||
@RabbitListener(bindings = @QueueBinding(
|
||||
value = @Queue(name = "topic.queue1", durable = "true"),
|
||||
exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT),
|
||||
key = {"red", "blue"}
|
||||
))
|
||||
public void listenTopicQueue1(String message) {
|
||||
System.out.println("topic.queue1 收到了 【消息]" + message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 接受消息 topic.queue1
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
@RabbitListener(queues = "topic.queue2")
|
||||
@RabbitListener(bindings = @QueueBinding(
|
||||
value = @Queue(name = "topic.queue2", durable = "true"),
|
||||
exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT),
|
||||
key = {"yellow", "red"}
|
||||
))
|
||||
public void listenTopicQueue2(String message) {
|
||||
System.out.println("topic.queue2 收到了 【消息]" + message);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
package com.example.rabbitmq;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Queue;
|
||||
|
||||
@SpringBootTest
|
||||
public class SendMessageTest {
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
// 声明使用JSON,否咋在消息队列中会有很长一段字符串
|
||||
@Test
|
||||
void testSendObject() {
|
||||
Map<String, Object> map = new HashMap<>(2);
|
||||
map.put("name","test");
|
||||
map.put("age", 21);
|
||||
rabbitTemplate.convertAndSend("object,queue", map);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue