feat(rabbitMq): RabbitMQ完成

This commit is contained in:
bunny 2024-05-10 09:56:29 +08:00
parent 512d67d7d3
commit b6b2972e89
14 changed files with 187 additions and 0 deletions

View File

@ -26,5 +26,10 @@
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.16.0-rc1</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,38 @@
package cn.bunny.module.rabbitMQ.config;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RabbitMqConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(returned -> {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
});
}
@Bean
public MessageConverter messageConverter() {
// 1.定义消息转换器
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息
converter.setCreateMessageIds(true);
return converter;
}
}

View File

@ -0,0 +1,9 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DelayConsumer {
}

View File

@ -0,0 +1,24 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DirectConsumer {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 发送红色消息
*/
@Test
void testSendDirectRed() throws Exception {
for (int i = 0; i < 1000; i++) {
rabbitTemplate.convertAndSend("bunny.direct", "red", "发送消息:" + i);
}
}
}

View File

@ -0,0 +1,9 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ErrorConsumer {
}

View File

@ -0,0 +1,9 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class FanoutConsumer {
}

View File

@ -0,0 +1,9 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class LazyConsumer {
}

View File

@ -0,0 +1,9 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TopicConsumer {
}

View File

@ -0,0 +1,9 @@
package cn.bunny.module.rabbitMQ.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DelayListener {
}

View File

@ -0,0 +1,30 @@
package cn.bunny.module.rabbitMQ.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DirectListener {
/**
* * 监听者2
* 创建队列 持久化的不自动删除
* 创建交换机 持久化的不自动删除
* key包含 red yellow
*
* @param message 接受消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2", durable = "true", autoDelete = "false"),
exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String message) {
System.out.println("消费者2接收到 Direct key 为 {\"red\", \"yellow\"} 消息:【" + message + "");
}
}

View File

@ -0,0 +1,9 @@
package cn.bunny.module.rabbitMQ.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ErrorListener {
}

View File

@ -0,0 +1,9 @@
package cn.bunny.module.rabbitMQ.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class FanoutListener {
}

View File

@ -0,0 +1,9 @@
package cn.bunny.module.rabbitMQ.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class LazyListener {
}

View File

@ -0,0 +1,9 @@
package cn.bunny.module.rabbitMQ.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TopicListener {
}