feat(新增): 🚀 添加rabbitMQ示例

This commit is contained in:
bunny 2024-09-09 15:20:36 +08:00
parent 6cfc73d3cf
commit 85cca2f90e
14 changed files with 171 additions and 123 deletions

View File

@ -2,37 +2,47 @@ package cn.bunny.module.rabbitMQ.config;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component; import org.springframework.lang.NonNull;
@Component @Configuration
@Slf4j @Slf4j
public class RabbitMqConfig { public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired @Autowired
private RabbitTemplate rabbitTemplate; private RabbitTemplate rabbitTemplate;
@PostConstruct @PostConstruct
public void init() { public void initRabbitTemplate() {
rabbitTemplate.setReturnsCallback(returned -> { rabbitTemplate.setConfirmCallback(this);
log.error("触发return callback,"); rabbitTemplate.setReturnsCallback(this);
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
});
} }
@Bean @Override
public MessageConverter messageConverter() { public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 1.定义消息转换器 log.info("confirm()同调函数打印 CorrelationData:{}", correlationData);
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); log.info("confirm()回调函数打印 ack:{}", ack);
// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息 log.info("confirm()回调函数打印 cause:{}", cause);
converter.setCreateMessageIds(true);
return converter;
} }
@Override
public void returnedMessage(@NonNull ReturnedMessage returned) {
log.info("returnedMessage()回调函数 消息主体:{}", new String(returned.getMessage().getBody()));
log.info("returnedMessage()回调函数 应答码:{}", returned.getReplyCode());
log.info("returnedMessage()回调函数 描述:{}", returned.getReplyText());
log.info("returnedMessage()回调函数 消息使用的交换器 exchange :{}", returned.getExchange());
log.info("returnedMessage()回调函数 消息使用的路由键 routing :{}", returned.getRoutingKey());
}
// @Bean
// public MessageConverter messageConverter() {
// // 1.定义消息转换器
// Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
// // 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息
// converter.setCreateMessageIds(true);
// return converter;
// }
} }

View File

@ -1,9 +0,0 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DelayConsumer {
}

View File

@ -1,24 +0,0 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DirectConsumer {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 发送红色消息
*/
@Test
void testSendDirectRed() throws Exception {
for (int i = 0; i < 1000; i++) {
rabbitTemplate.convertAndSend("bunny.direct", "red", "发送消息:" + i);
}
}
}

View File

@ -1,9 +0,0 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ErrorConsumer {
}

View File

@ -1,9 +0,0 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class FanoutConsumer {
}

View File

@ -1,9 +0,0 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class LazyConsumer {
}

View File

@ -1,9 +0,0 @@
package cn.bunny.module.rabbitMQ.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TopicConsumer {
}

View File

@ -1,30 +1,51 @@
package cn.bunny.module.rabbitMQ.listener; package cn.bunny.module.rabbitMQ.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component @Component
@Slf4j @Slf4j
public class DirectListener { public class DirectListener {
/**
* * 监听者2
* 创建队列 持久化的不自动删除
* 创建交换机 持久化的不自动删除
* key包含 red yellow
*
* @param message 接受消息
*/
@RabbitListener(bindings = @QueueBinding( @RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2", durable = "true", autoDelete = "false"), exchange = @Exchange(name = "exchange.direct.order"),
exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"), key = {"order"},
key = {"red", "yellow"} value = @Queue(name = "queue.direct.order", durable = "true", autoDelete = "false"),
arguments = {
//
@Argument(name = "alternate-exchange", value = "exchange.fanout.order.backup"),
// 惰性队列
@Argument(name = "x-queue-mode", value = "lazy")
}
)) ))
public void listenDirectQueue2(String message) { public void listenDirectQueue2(String dataString, Message message, Channel channel) throws IOException {
System.out.println("消费者2接收到 Direct key 为 {\"red\", \"yellow\"} 消息:【" + message + ""); long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("消费者2接收到 Direct key 为 order 消息:【" + dataString + "");
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
// 是否之前投递过
Boolean redelivered = message.getMessageProperties().getRedelivered();
System.out.println("是否投递过:" + redelivered);
if (!redelivered) {
// 是否重复投递 true
channel.basicNack(deliveryTag, false, true);
throw new RuntimeException(e);
}
}
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "exchange.fanout.order.backup", type = ExchangeTypes.FANOUT),
value = @Queue(name = "queue.order.backup", durable = "true", autoDelete = "false")
))
public void listenBackup(String message) {
log.info("备份交换机:{}", message);
} }
} }

View File

@ -25,6 +25,13 @@
<artifactId>service-utils</artifactId> <artifactId>service-utils</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
</dependency> </dependency>
<!-- rabbitMQ -->
<dependency>
<groupId>cn.bunny</groupId>
<artifactId>module-rabbitMQ</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- security -->
<dependency> <dependency>
<groupId>cn.bunny</groupId> <groupId>cn.bunny</groupId>
<artifactId>spring-security</artifactId> <artifactId>spring-security</artifactId>

View File

@ -0,0 +1,12 @@
package cn.bunny.service.mq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class PublishDirect {
@Autowired
private RabbitTemplate rabbitTemplate;
}

View File

@ -1,19 +1,26 @@
bunny: bunny:
datasource: datasource:
host: 192.168.3.100 host: 192.168.3.98
port: 3305 port: 3306
sqlData: bunny_docs sqlData: bunny_docs
username: root username: root
password: "02120212" password: "123456"
redis: redis:
host: 192.168.3.100 host: 192.168.3.98
port: 6379 port: 6379
database: 0 database: 0
password: "123456" password: "123456"
minio: minio:
endpointUrl: "http://192.168.3.100:9000" endpointUrl: "http://192.168.3.98:9000"
bucket-name: test bucket-name: test
accessKey: bunny accessKey: bunny
secretKey: "02120212" secretKey: "02120212"
rabbitmq:
host: 192.168.3.98
port: 5672
username: admin
password: "admin"
virtual-host: /

View File

@ -2,23 +2,30 @@
knife4j: knife4j:
enable: true enable: true
production: true production: true
bunny: bunny:
datasource: datasource:
host: 192.168.3.100 host: 192.168.3.98
port: 3306 port: 3306
sqlData: bunny_docs sqlData: bunny_docs
username: root username: root
password: "02120212" password: "123456"
redis: redis:
host: 192.168.3.100 host: 192.168.3.98
port: 6379 port: 6379
database: 0 database: 0
password: "123456" password: "123456"
minio: minio:
endpointUrl: "http://192.168.3.100:9000" endpointUrl: "http://192.168.3.98:9000"
bucket-name: test bucket-name: test
accessKey: bunny accessKey: bunny
secretKey: "02120212" secretKey: "02120212"
rabbitmq:
host: 192.168.3.98
port: 5672
username: admin
password: "admin"
virtual-host: /

View File

@ -44,19 +44,39 @@ spring:
date-format: yyyy-MM-dd HH:mm:ss date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8 time-zone: GMT+8
rabbitmq:
host: ${bunny.rabbitmq.host}
port: ${bunny.rabbitmq.port}
username: ${bunny.rabbitmq.username}
password: ${bunny.rabbitmq.password}
virtual-host: ${bunny.rabbitmq.virtual-host}
# publisher-confirm-type: correlated # 交换机确认
# publisher-returns: true # 队列确认
# listener:
# simple:
# acknowledge-mode: manual # 手动处理消息
# connection-timeout: 1s # 设置MQ连接超时时间
# template:
# retry:
# enabled: true # 失败重试
# initial-interval: 1000ms # 失败后初始时间
# multiplier: 1 # 失败后下次等待时长倍数 initial-interval * multiplier
# max-attempts: 3 # 最大重试次数
mybatis-plus: mybatis-plus:
mapper-locations: classpath:mapper/*.xml mapper-locations: classpath:mapper/*.xml
global-config: global-config:
db-config: db-config:
logic-delete-field: isDelete logic-delete-field: isDelete
configuration: # configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 查看日志 # log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 查看日志
logging: logging:
level: level:
cn.bunny.service.mapper: debug cn.bunny.service.mapper: info
cn.bunny.service.controller: info cn.bunny.service.controller: info
cn.bunny.service.service: info cn.bunny.service.service: info
root: info
pattern: pattern:
dateformat: HH:mm:ss:SSS dateformat: HH:mm:ss:SSS
file: file:

View File

@ -0,0 +1,33 @@
package cn.bunny.service.mq;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.nio.charset.StandardCharsets;
@SpringBootTest
class PublishDirectTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void publishDirect() throws Exception {
rabbitTemplate.convertAndSend("exchange.direct.order", "order", "消息。。。");
}
@Test
void publishDirect2() {
Message message = MessageBuilder
.withBody("hello".getBytes(StandardCharsets.UTF_8))
.setReceivedRoutingKey("order")
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
for (int i = 0; i < 1000000; i++) {
rabbitTemplate.convertAndSend("exchange.direct.order", "order", "消息。。。");
}
}
}