✨ 仲裁队列完成;
This commit is contained in:
parent
6fcf7a4b75
commit
84c2d06b0c
|
@ -1645,3 +1645,53 @@ bunny:
|
|||
**路由键:**routing.key.cluster.test
|
||||
|
||||

|
||||
|
||||
## 集群中的队列
|
||||
|
||||
### 镜像队列
|
||||
|
||||
最新版本中已经退出历史舞台了,就不说了。
|
||||
|
||||
### 仲裁队列
|
||||
|
||||
当我们在集群中创建队列时,如【集群环境的连接】那一节,可以看到当前的队列是位于,第一个节点上的。如果这时候服务器宕机或者其他情况,这时这个节点上数据没了,其它j节点时访问不到的;我们理想的情况是,如果一台服务器宕机,这台服务器上的所有内容在别的节点上都是可以看到的。
|
||||
|
||||

|
||||
|
||||
#### 创建仲裁队列
|
||||
|
||||
**交换机:**exchange.quorum.test
|
||||
|
||||
**队列:**queue.quorum.test
|
||||
|
||||
**l路由键:**routing.key.quorum.test
|
||||
|
||||

|
||||
|
||||
#### 测试Code
|
||||
|
||||
**生产者**
|
||||
|
||||
```java
|
||||
/* 仲裁队列发送消息 */
|
||||
@Test
|
||||
void clusterQuorumTest2() {
|
||||
final String EXCHANGE = "exchange.quorum.test";
|
||||
final String ROUTING_KEY = "routing.key.quorum.test";
|
||||
|
||||
for (int i = 0; i <= 10; i++) {
|
||||
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "仲裁队列发送消息-" + i);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**消费者**
|
||||
|
||||
```java
|
||||
/* 仲裁队列监听 */
|
||||
@RabbitListener(queues = "queue.quorum.test")
|
||||
public void processMessage(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
|
||||
log.info("<<仲裁队列监听下消息>>----{}", dataString);
|
||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||
}
|
||||
```
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 8.3 KiB |
Binary file not shown.
After Width: | Height: | Size: 31 KiB |
|
@ -21,5 +21,13 @@ public class ClusterEnvironmentListener {
|
|||
@RabbitListener(queues = "queue.cluster.test")
|
||||
public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
|
||||
log.info("<<集群环境下消息>>----<cluster>{}", dataString);
|
||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||
}
|
||||
|
||||
/* 仲裁队列监听 */
|
||||
@RabbitListener(queues = "queue.quorum.test")
|
||||
public void processMessage(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
|
||||
log.info("<<仲裁队列监听下消息>>----{}", dataString);
|
||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,4 +20,15 @@ public class ClusterEnvironmentTest {
|
|||
"集群环境下消息-" + i);
|
||||
}
|
||||
}
|
||||
|
||||
/* 仲裁队列发送消息 */
|
||||
@Test
|
||||
void clusterQuorumTest2() {
|
||||
final String EXCHANGE = "exchange.quorum.test";
|
||||
final String ROUTING_KEY = "routing.key.quorum.test";
|
||||
|
||||
for (int i = 0; i <= 10; i++) {
|
||||
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, "仲裁队列发送消息-" + i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue