联邦队列完成;

This commit is contained in:
bunny 2025-05-20 16:18:13 +08:00
parent 84c2d06b0c
commit d0fcac5455
9 changed files with 187 additions and 30 deletions

View File

@ -5,14 +5,29 @@
```bash
docker run -d --name rabbitmq_master --restart=always \
-p 5672:5672 -p 15672:15672 \
-v ~/docker/docker_data/rabbitmq/rabbitmq_master/data:/var/lib/rabbitmq_master \
-v ~/docker/docker_data/rabbitmq/rabbitmq_master/conf:/etc/rabbitmq_master \
-v ~/docker/docker_data/rabbitmq/rabbitmq_master/log:/var/log/rabbitmq_master \
-v ~/docker/docker_data/rabbitmq/rabbitmq_master/data:/var/lib/rabbitmq \
-v ~/docker/docker_data/rabbitmq/rabbitmq_master/conf:/etc/rabbitmq \
-v ~/docker/docker_data/rabbitmq/rabbitmq_master/log:/var/log/rabbitmq \
-e RABBITMQ_DEFAULT_VHOST=rabbitmq_master \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.13.7-management
```
**后续测试联邦队列**
如果后续需要学习联邦队列可以再复制下面的内容,根据自己需求修改下端口号
```bash
docker run -d --name rabbitmq_2 --restart=always \
-p 5673:5672 -p 15673:15672 \
-v ~/docker/docker_data/rabbitmq/rabbitmq2/data:/var/lib/rabbitmq \
-v ~/docker/docker_data/rabbitmq/rabbitmq2/conf:/etc/rabbitmq \
-v ~/docker/docker_data/rabbitmq/rabbitmq2/log:/var/log/rabbitmq \
-e RABBITMQ_DEFAULT_VHOST=rabbitmq2 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.13.7-management
```
## 基本示例
**生产者**
@ -1695,3 +1710,139 @@ public void processMessage(String dataString, Channel channel, Message message)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
```
### 联邦队列
Federation插件的设计目标是使RabbitMQ在不同的Broker节点之间进行消息传递而无须建立集群。
它可以在不同的管理域中的Broker或集群间传递消息这些管理域可能设置了不同的用户和vhost也可能运行在不同版本的RabbitMQ和Erlang上。Federation基于AMQP0-9-1协议在不同的Broker之间进行通信并且设计成能够容忍不稳定的网络连接情况。
- 各节点操作:启用联邦插件
- 下游操作:
- 添加上游连接端点
- 创建控制策略
> [!NOTE]
>
> 因为联邦队列不是集群需要两个不同rabbit比如说一个服务器在上海一个在杭州两个不同的服务器且不是集群环境。
>
> 所以在创建时候不要加入集群。
#### 创建docker容器
**设置权限**
```bash
sudo chown -R 999:999 ~/docker/docker_data/rabbitmq/rabbitmq_master
sudo chown -R 999:999 ~/docker/docker_data/rabbitmq/rabbitmq_2
```
**创建容器**
```bash
# 第一个
docker run -d --name rabbitmq_master --restart=always \
-p 5672:5672 -p 15672:15672 \
-v ~/docker/docker_data/rabbitmq/rabbitmq_master/data:/var/lib/rabbitmq \
-v ~/docker/docker_data/rabbitmq/rabbitmq_master/conf:/etc/rabbitmq \
-v ~/docker/docker_data/rabbitmq/rabbitmq_master/log:/var/log/rabbitmq \
-e RABBITMQ_DEFAULT_VHOST=rabbitmq_master \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.13.7-management
# 第二个
docker run -d --name rabbitmq_2 --restart=always \
-p 5673:5672 -p 15673:15672 \
-v ~/docker/docker_data/rabbitmq/rabbitmq_2/data:/var/lib/rabbitmq \
-v ~/docker/docker_data/rabbitmq/rabbitmq_2/conf:/etc/rabbitmq \
-v ~/docker/docker_data/rabbitmq/rabbitmq_2/log:/var/log/rabbitmq \
-e RABBITMQ_DEFAULT_VHOST=rabbitmq_2 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.13.7-management
```
#### 启动联邦插件
> [!NOTE]
>
> 联邦插件并不需要下载启动即可。
> [!IMPORTANT]
>
> 创建时候需要重新设置Virtual Hosts否则设置了内容会显示`not_allow`不能使用默认的Virtual Hosts。
```bash
# 执行下面命令进行开启
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_mangement
```
不进入docker容器直接运行方式
```bash
# 开启全部插件防止警告
docker exec -it rabbitmq_master rabbitmqctl enable_feature_flag all
docker exec -it rabbitmq_2 rabbitmqctl enable_feature_flag all
docker exec -it rabbitmq_master rabbitmq-plugins enable rabbitmq_federation
docker exec -it rabbitmq_master rabbitmq-plugins enable rabbitmq_federation_management
docker exec -it rabbitmq_2 rabbitmq-plugins enable rabbitmq_federation
docker exec -it rabbitmq_2 rabbitmq-plugins enable rabbitmq_federation_management
```
![image-20250520145425061](./images/image-20250520145425061.png)
启用成功后会有下面两个选项
![image-20250520145937321](./images/image-20250520145937321.png)
在此页面中添加
![image-20250520150530165](./images/image-20250520150530165.png)
上图输入框中的内容
```bash
# 连接名称
bunny.upstream
# 连接地址
amqp://admin:admin@172.17.0.1:5672
```
创建Policies
![image-20250520150907385](./images/image-20250520150907385.png)
上图输入框内容
```bash
name: policy.federation.exchange
Pattern: ^federation\.
Apply to: Exchanges
Priority: 10
# 这里的federation-upstream 是我们之前创建的联邦名称,详细看上面【上图的,输入框中的内容】
federation-upstream bunny.upstream
```
#### 创建交换机
> [!IMPORTANT]
>
> 创建联邦交换机时,需要匹配上面写的`Pattern`,如果按照文档创建,那么名称需要是:`federation.xxx.xxxxx`这种格式,反正肯定是以`federation`开头的。
>
> 其次路由键要一致,队列名称可以不一样。
| 所在机房 | 交换机名称 | 路由键 | 队列名称 |
| ----------------------------- | ------------------------ | --------------------- | ---------------------- |
| rabbitmq_master你的上游MQ | federation.exchange.demo | routing.key.demo.test | queue.normal.master |
| rabbitmq2你的下游MQ | federation.exchange.demo | routing.key.demo.test | queue.normal.rabbitmq2 |
> [!NOTE]
>
> 但此时发现下游节点中联邦队列并没有接收到消息,这是为什么呢?这里就体现出了联邦队列和联邦交换机工作逻辑的区别。
> 对联邦队列来说,如果没有监听联邦队列的消费端程序,它是不会到上游去拉取消窶的!
> 如果有消费端监听联邦队列,那么首先消费联邦队列自身的消息;如果联邦队列为空,这时候才会到上游队列节点中拉取消息。
> 所以现在的测试效果需要消费端程序配合才能看到:

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 64 KiB

View File

@ -1,33 +1,23 @@
package cn.bunny.mq.mqdemo.mq.listener;
import com.rabbitmq.client.Channel;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class ClusterEnvironmentListener {
@Resource
private RabbitTemplate rabbitTemplate;
/* 测试优先级队列 */
@RabbitListener(queues = "queue.cluster.test")
public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
log.info("<<集群环境下消息>>----<cluster>{}", dataString);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
/* 仲裁队列监听 */
@RabbitListener(queues = "queue.quorum.test")
public void processMessage(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
log.info("<<仲裁队列监听下消息>>----{}", dataString);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
// /* 测试优先级队列 */
// @RabbitListener(queues = "queue.cluster.test")
// public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
// log.info("<<集群环境下消息>>----<cluster>{}", dataString);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// }
//
// /* 仲裁队列监听 */
// @RabbitListener(queues = "queue.quorum.test")
// public void processMessage(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
// log.info("<<仲裁队列监听下消息>>----{}", dataString);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// }
}

View File

@ -1,8 +1,13 @@
package cn.bunny.mq.mqdemo.mq.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class MessageListenerOrder {
@ -87,4 +92,11 @@ public class MessageListenerOrder {
// log.info("<<优先级队列>>----<priority>{}", dataString);
// }
/* 测试联邦队列消息 */
@RabbitListener(queues = "queue.normal.rabbitmq2")
public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException {
log.info("<<测试联邦队列消息>>----<priority>{}", dataString);
}
}

View File

@ -3,10 +3,14 @@ server:
bunny:
rabbitmq:
host: 192.168.3.144
# port: 5672
# 集群环境的端口号
port: 22222
# # 集群环境
# host: 192.168.3.144
# port: 22222 # 集群环境的端口号
# 联邦队列---需要下游的
host: 192.168.3.148
port: 5673
virtual-host: /
username: admin
password: admin