延迟消息
This commit is contained in:
parent
16cd565dad
commit
2dd828d4ef
|
@ -10,4 +10,9 @@ public class CreateObjectQueue {
|
|||
public Queue ObjectQueue() {
|
||||
return new Queue("object,queue");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue objectQueue1() {
|
||||
return new Queue("object.queue");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package com.example.rabbitmq.config;
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
|
||||
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
|
||||
public class ErrorConfig {
|
||||
@Bean
|
||||
public DirectExchange errorExchange() {
|
||||
return new DirectExchange("error.exchange");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue errorQueue() {
|
||||
return new Queue("error.queue");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
|
||||
return BindingBuilder
|
||||
.bind(errorQueue())
|
||||
.to(errorExchange())
|
||||
.with("error");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
|
||||
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange");
|
||||
}
|
||||
}
|
|
@ -1,5 +1,7 @@
|
|||
package com.example.rabbitmq.config;
|
||||
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.QueueBuilder;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
@ -11,4 +13,12 @@ public class MqConfig {
|
|||
public MessageConverter messageConverter() {
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
|
||||
// 设置消息唯一id
|
||||
@Bean
|
||||
public MessageConverter jacksonMessageConverter() {
|
||||
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
|
||||
jsonMessageConverter.setCreateMessageIds(true);
|
||||
return jsonMessageConverter;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,4 +93,17 @@ public class MqListener {
|
|||
public void listenTopicQueue2(String message) {
|
||||
System.out.println("topic.queue2 收到了 【消息]" + message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听延迟队列
|
||||
* @param message 消息
|
||||
*/
|
||||
@RabbitListener(bindings = @QueueBinding(
|
||||
value = @Queue(value = "delay.queue", durable = "true"),
|
||||
exchange = @Exchange(value = "delay.direct", delayed = "true"),
|
||||
key = "hi"
|
||||
))
|
||||
public void listenDelayLazyQueue(String message) {
|
||||
log.info("接收到 lazy.queue消息:{}", message);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,10 +8,14 @@ spring:
|
|||
username: root
|
||||
password: 123456
|
||||
port: 5672
|
||||
host: 192.168.31.140
|
||||
# host: 192.168.31.140
|
||||
host: 192.168.2.156
|
||||
listener:
|
||||
simple:
|
||||
prefetch: 1
|
||||
acknowledge-mode: none # 默认机制
|
||||
retry:
|
||||
enabled: true # 开启重试机制
|
||||
connection-timeout: 1s # 连接延迟
|
||||
template:
|
||||
retry:
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
package com.example.rabbitmq;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.amqp.AmqpException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessagePostProcessor;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
@SpringBootTest
|
||||
public class TestSendDelayMessage {
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
/**
|
||||
* 发送延迟消息
|
||||
* 如果有很多定时消息对CPU压力会很大,如果设置延迟消息很长会对CPU压力很大
|
||||
*/
|
||||
@Test
|
||||
void testSendDelayMessage() {
|
||||
rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", new MessagePostProcessor() {
|
||||
@Override
|
||||
public Message postProcessMessage(Message message) throws AmqpException {
|
||||
message.getMessageProperties().setDelay(10000);
|
||||
return message;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue