diff --git a/mq-demo/ReadMe.md b/mq-demo/ReadMe.md index 2eae681..5ab7214 100644 --- a/mq-demo/ReadMe.md +++ b/mq-demo/ReadMe.md @@ -1645,3 +1645,53 @@ bunny: **路由键:**routing.key.cluster.test ![image-20250520105149567](./images/image-20250520105149567.png) + +## 集群中的队列 + +### 镜像队列 + +最新版本中已经退出历史舞台了,就不说了。 + +### 仲裁队列 + +当我们在集群中创建队列时,如【集群环境的连接】那一节,可以看到当前的队列是位于,第一个节点上的。如果这时候服务器宕机或者其他情况,这时这个节点上数据没了,其它j节点时访问不到的;我们理想的情况是,如果一台服务器宕机,这台服务器上的所有内容在别的节点上都是可以看到的。 + +![image-20250520141530322](./images/image-20250520141530322.png) + +#### 创建仲裁队列 + +**交换机:**exchange.quorum.test + +**队列:**queue.quorum.test + +**l路由键:**routing.key.quorum.test + +![image-20250520141821885](./images/image-20250520141821885.png) + +#### 测试Code + +**生产者** + +```java +/* 仲裁队列发送消息 */ +@Test +void clusterQuorumTest2() { + final String EXCHANGE = "exchange.quorum.test"; + final String ROUTING_KEY = "routing.key.quorum.test"; + + for (int i = 0; i <= 10; i++) { + rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "仲裁队列发送消息-" + i); + } +} +``` + +**消费者** + +```java +/* 仲裁队列监听 */ +@RabbitListener(queues = "queue.quorum.test") +public void processMessage(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("<<仲裁队列监听下消息>>----{}", dataString); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); +} +``` diff --git a/mq-demo/images/image-20250520141530322.png b/mq-demo/images/image-20250520141530322.png new file mode 100644 index 0000000..037915f Binary files /dev/null and b/mq-demo/images/image-20250520141530322.png differ diff --git a/mq-demo/images/image-20250520141821885.png b/mq-demo/images/image-20250520141821885.png new file mode 100644 index 0000000..38e7cef Binary files /dev/null and b/mq-demo/images/image-20250520141821885.png differ diff --git a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/ClusterEnvironmentListener.java b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/ClusterEnvironmentListener.java index 035ac40..74d5761 100644 --- a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/ClusterEnvironmentListener.java +++ b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/ClusterEnvironmentListener.java @@ -21,5 +21,13 @@ public class ClusterEnvironmentListener { @RabbitListener(queues = "queue.cluster.test") public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException { log.info("<<集群环境下消息>>----{}", dataString); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } + + /* 仲裁队列监听 */ + @RabbitListener(queues = "queue.quorum.test") + public void processMessage(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("<<仲裁队列监听下消息>>----{}", dataString); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } diff --git a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/ClusterEnvironmentTest.java b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/ClusterEnvironmentTest.java index 1ef34e5..d38901e 100644 --- a/mq-demo/src/test/java/cn/bunny/mq/mqdemo/ClusterEnvironmentTest.java +++ b/mq-demo/src/test/java/cn/bunny/mq/mqdemo/ClusterEnvironmentTest.java @@ -20,4 +20,15 @@ public class ClusterEnvironmentTest { "集群环境下消息-" + i); } } + + /* 仲裁队列发送消息 */ + @Test + void clusterQuorumTest2() { + final String EXCHANGE = "exchange.quorum.test"; + final String ROUTING_KEY = "routing.key.quorum.test"; + + for (int i = 0; i <= 10; i++) { + rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "仲裁队列发送消息-" + i); + } + } }