diff --git a/mq-demo/ReadMe.md b/mq-demo/ReadMe.md index 5ab7214..9d9be79 100644 --- a/mq-demo/ReadMe.md +++ b/mq-demo/ReadMe.md @@ -5,14 +5,29 @@ ```bash docker run -d --name rabbitmq_master --restart=always \ -p 5672:5672 -p 15672:15672 \ --v ~/docker/docker_data/rabbitmq/rabbitmq_master/data:/var/lib/rabbitmq_master \ --v ~/docker/docker_data/rabbitmq/rabbitmq_master/conf:/etc/rabbitmq_master \ --v ~/docker/docker_data/rabbitmq/rabbitmq_master/log:/var/log/rabbitmq_master \ +-v ~/docker/docker_data/rabbitmq/rabbitmq_master/data:/var/lib/rabbitmq \ +-v ~/docker/docker_data/rabbitmq/rabbitmq_master/conf:/etc/rabbitmq \ +-v ~/docker/docker_data/rabbitmq/rabbitmq_master/log:/var/log/rabbitmq \ -e RABBITMQ_DEFAULT_VHOST=rabbitmq_master \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.13.7-management ``` +**后续测试联邦队列** + +如果后续需要学习联邦队列可以再复制下面的内容,根据自己需求修改下端口号 + +```bash +docker run -d --name rabbitmq_2 --restart=always \ +-p 5673:5672 -p 15673:15672 \ +-v ~/docker/docker_data/rabbitmq/rabbitmq2/data:/var/lib/rabbitmq \ +-v ~/docker/docker_data/rabbitmq/rabbitmq2/conf:/etc/rabbitmq \ +-v ~/docker/docker_data/rabbitmq/rabbitmq2/log:/var/log/rabbitmq \ +-e RABBITMQ_DEFAULT_VHOST=rabbitmq2 \ +-e RABBITMQ_DEFAULT_USER=admin \ +-e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.13.7-management +``` + ## 基本示例 **生产者** @@ -1695,3 +1710,139 @@ public void processMessage(String dataString, Channel channel, Message message) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } ``` + +### 联邦队列 + +Federation插件的设计目标是使RabbitMQ在不同的Broker节点之间进行消息传递而无须建立集群。 + +它可以在不同的管理域中的Broker或集群间传递消息,这些管理域可能设置了不同的用户和vhost,也可能运行在不同版本的RabbitMQ和Erlang上。Federation基于AMQP0-9-1协议在不同的Broker之间进行通信,并且设计成能够容忍不稳定的网络连接情况。 + +- 各节点操作:启用联邦插件 +- 下游操作: + - 添加上游连接端点 + - 创建控制策略 + +> [!NOTE] +> +> 因为联邦队列不是集群,需要两个不同rabbit,比如说,一个服务器在上海,一个在杭州,两个不同的服务器且不是集群环境。 +> +> 所以在创建时候不要加入集群。 + +#### 创建docker容器 + +**设置权限** + +```bash +sudo chown -R 999:999 ~/docker/docker_data/rabbitmq/rabbitmq_master +sudo chown -R 999:999 ~/docker/docker_data/rabbitmq/rabbitmq_2 +``` + +**创建容器** + +```bash +# 第一个 +docker run -d --name rabbitmq_master --restart=always \ +-p 5672:5672 -p 15672:15672 \ +-v ~/docker/docker_data/rabbitmq/rabbitmq_master/data:/var/lib/rabbitmq \ +-v ~/docker/docker_data/rabbitmq/rabbitmq_master/conf:/etc/rabbitmq \ +-v ~/docker/docker_data/rabbitmq/rabbitmq_master/log:/var/log/rabbitmq \ +-e RABBITMQ_DEFAULT_VHOST=rabbitmq_master \ +-e RABBITMQ_DEFAULT_USER=admin \ +-e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.13.7-management + +# 第二个 +docker run -d --name rabbitmq_2 --restart=always \ +-p 5673:5672 -p 15673:15672 \ +-v ~/docker/docker_data/rabbitmq/rabbitmq_2/data:/var/lib/rabbitmq \ +-v ~/docker/docker_data/rabbitmq/rabbitmq_2/conf:/etc/rabbitmq \ +-v ~/docker/docker_data/rabbitmq/rabbitmq_2/log:/var/log/rabbitmq \ +-e RABBITMQ_DEFAULT_VHOST=rabbitmq_2 \ +-e RABBITMQ_DEFAULT_USER=admin \ +-e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.13.7-management +``` + +#### 启动联邦插件 + +> [!NOTE] +> +> 联邦插件并不需要下载启动即可。 + +> [!IMPORTANT] +> +> 创建时候需要重新设置Virtual Hosts否则设置了内容会显示`not_allow`,不能使用默认的Virtual Hosts。 + +```bash +# 执行下面命令进行开启 +rabbitmq-plugins enable rabbitmq_federation +rabbitmq-plugins enable rabbitmq_federation_mangement +``` + +不进入docker容器直接运行方式 + +```bash +# 开启全部插件防止警告 +docker exec -it rabbitmq_master rabbitmqctl enable_feature_flag all +docker exec -it rabbitmq_2 rabbitmqctl enable_feature_flag all + +docker exec -it rabbitmq_master rabbitmq-plugins enable rabbitmq_federation +docker exec -it rabbitmq_master rabbitmq-plugins enable rabbitmq_federation_management + +docker exec -it rabbitmq_2 rabbitmq-plugins enable rabbitmq_federation +docker exec -it rabbitmq_2 rabbitmq-plugins enable rabbitmq_federation_management +``` + +![image-20250520145425061](./images/image-20250520145425061.png) + +启用成功后会有下面两个选项 + +![image-20250520145937321](./images/image-20250520145937321.png) + +在此页面中添加 + +![image-20250520150530165](./images/image-20250520150530165.png) + +上图输入框中的内容 + +```bash +# 连接名称 +bunny.upstream + +# 连接地址 +amqp://admin:admin@172.17.0.1:5672 +``` + +创建Policies + +![image-20250520150907385](./images/image-20250520150907385.png) + +上图输入框内容 + +```bash +name: policy.federation.exchange +Pattern: ^federation\. +Apply to: Exchanges +Priority: 10 + +# 这里的federation-upstream 是我们之前创建的联邦名称,详细看上面【上图的,输入框中的内容】 +federation-upstream bunny.upstream +``` + +#### 创建交换机 + +> [!IMPORTANT] +> +> 创建联邦交换机时,需要匹配上面写的`Pattern`,如果按照文档创建,那么名称需要是:`federation.xxx.xxxxx`这种格式,反正肯定是以`federation`开头的。 +> +> 其次路由键要一致,队列名称可以不一样。 + +| 所在机房 | 交换机名称 | 路由键 | 队列名称 | +| ----------------------------- | ------------------------ | --------------------- | ---------------------- | +| rabbitmq_master(你的上游MQ) | federation.exchange.demo | routing.key.demo.test | queue.normal.master | +| rabbitmq2(你的下游MQ) | federation.exchange.demo | routing.key.demo.test | queue.normal.rabbitmq2 | + +> [!NOTE] +> +> 但此时发现下游节点中联邦队列并没有接收到消息,这是为什么呢?这里就体现出了联邦队列和联邦交换机工作逻辑的区别。 +> 对联邦队列来说,如果没有监听联邦队列的消费端程序,它是不会到上游去拉取消窶的! +> 如果有消费端监听联邦队列,那么首先消费联邦队列自身的消息;如果联邦队列为空,这时候才会到上游队列节点中拉取消息。 +> 所以现在的测试效果需要消费端程序配合才能看到: diff --git a/mq-demo/images/image-20250520145425061.png b/mq-demo/images/image-20250520145425061.png new file mode 100644 index 0000000..53705d4 Binary files /dev/null and b/mq-demo/images/image-20250520145425061.png differ diff --git a/mq-demo/images/image-20250520145937321.png b/mq-demo/images/image-20250520145937321.png new file mode 100644 index 0000000..100cd04 Binary files /dev/null and b/mq-demo/images/image-20250520145937321.png differ diff --git a/mq-demo/images/image-20250520150526785.png b/mq-demo/images/image-20250520150526785.png new file mode 100644 index 0000000..755b763 Binary files /dev/null and b/mq-demo/images/image-20250520150526785.png differ diff --git a/mq-demo/images/image-20250520150530165.png b/mq-demo/images/image-20250520150530165.png new file mode 100644 index 0000000..755b763 Binary files /dev/null and b/mq-demo/images/image-20250520150530165.png differ diff --git a/mq-demo/images/image-20250520150907385.png b/mq-demo/images/image-20250520150907385.png new file mode 100644 index 0000000..41879ea Binary files /dev/null and b/mq-demo/images/image-20250520150907385.png differ diff --git a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/ClusterEnvironmentListener.java b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/ClusterEnvironmentListener.java index 74d5761..06cc403 100644 --- a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/ClusterEnvironmentListener.java +++ b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/ClusterEnvironmentListener.java @@ -1,33 +1,23 @@ package cn.bunny.mq.mqdemo.mq.listener; -import com.rabbitmq.client.Channel; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; -import java.io.IOException; - @Component @Slf4j public class ClusterEnvironmentListener { - @Resource - private RabbitTemplate rabbitTemplate; - - /* 测试优先级队列 */ - @RabbitListener(queues = "queue.cluster.test") - public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException { - log.info("<<集群环境下消息>>----{}", dataString); - channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); - } - - /* 仲裁队列监听 */ - @RabbitListener(queues = "queue.quorum.test") - public void processMessage(String dataString, Channel channel, Message message) throws IOException, InterruptedException { - log.info("<<仲裁队列监听下消息>>----{}", dataString); - channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); - } + // /* 测试优先级队列 */ + // @RabbitListener(queues = "queue.cluster.test") + // public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + // log.info("<<集群环境下消息>>----{}", dataString); + // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + // } + // + // /* 仲裁队列监听 */ + // @RabbitListener(queues = "queue.quorum.test") + // public void processMessage(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + // log.info("<<仲裁队列监听下消息>>----{}", dataString); + // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + // } } diff --git a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java index f61fcac..1fe524f 100644 --- a/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java +++ b/mq-demo/src/main/java/cn/bunny/mq/mqdemo/mq/listener/MessageListenerOrder.java @@ -1,8 +1,13 @@ package cn.bunny.mq.mqdemo.mq.listener; +import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; +import java.io.IOException; + @Component @Slf4j public class MessageListenerOrder { @@ -87,4 +92,11 @@ public class MessageListenerOrder { // log.info("<<优先级队列>>----{}", dataString); // } + + /* 测试联邦队列消息 */ + @RabbitListener(queues = "queue.normal.rabbitmq2") + public void processMessagePriority(String dataString, Channel channel, Message message) throws IOException, InterruptedException { + log.info("<<测试联邦队列消息>>----{}", dataString); + } + } diff --git a/mq-demo/src/main/resources/application-dev.yml b/mq-demo/src/main/resources/application-dev.yml index 4513150..8627a00 100644 --- a/mq-demo/src/main/resources/application-dev.yml +++ b/mq-demo/src/main/resources/application-dev.yml @@ -3,10 +3,14 @@ server: bunny: rabbitmq: - host: 192.168.3.144 - # port: 5672 - # 集群环境的端口号 - port: 22222 + # # 集群环境 + # host: 192.168.3.144 + # port: 22222 # 集群环境的端口号 + + # 联邦队列---需要下游的 + host: 192.168.3.148 + port: 5673 + virtual-host: / username: admin password: admin