RabbitMQ官网:https://www.rabbitmq.com/。
官方文档:https://rabbitmq.com/documentation.html。
官方Demo:https://www.rabbitmq.com/getstarted.html。
部署 RabbitMQ
使用Docker来部署RabbitMQ只需要两个步骤:
-
pull
RabbitMQ镜像:docker pull rabbitmq:3-management
RabbitMQ通过插件的方式提供了一个可视化界面。在RabbitMQ官方的镜像中,带
management
版本的就是集成了该可视化插件的镜像。通过Docker官方的镜像源来
pull
RabbitMQ镜像可能会有点慢,可以使用一些国内的镜像源:提供方 |镜像URL 中国区官方镜像 |
https://registry.docker-cn.com
清华源 |https://docker.mirrors.ustc.edu.cn
阿里源 |https://cr.console.aliyun.com
腾讯源 |https://mirror.ccs.tencentyun.com
网易源 |http://hub-mirror.c.163.com
道客镜像站 |http://f1361db2.m.daocloud.io
-
启动RabbitMQ容器:
docker run \ -e RABBITMQ_DEFAULT_USER=linner \ # RabbitMQ用户名(默认为guest) -e RABBITMQ_DEFAULT_PASS=123456 \ # RabbitMQ密码(默认为guest) --name rabbitmq-server \ --hostname rabbitmq \ # 域名,单机下可忽略,通常用于集群部署时 -p 15672:15672 \ # 可视化管理界面端口 -p 5672:5672 \ # 服务端口 -d \ rabbitmq:3-management
启动容器后,进入http://localhost:15672/即可进入RabbitMQ管理界面。
使用以下命令查看RabbitMQ日志:
docker logs rabbitmq-server # 替换成你容器的名称
RabbitMQ 结构
RabbitMQ的基本结构如下:
在RabbitMQ中有几个概念:
channel
:操作RabbitMQ的工具。exchange
:路由消息到队列中。queue
:缓存消息。virtual host
:虚拟主机,是对queue
、exchange
等资源的逻辑分组。
RabbitMQ中的角色主要有:
- Publisher:生产者。将消息发送给Queue(队列)或Exchange(交换机)。
- Consumer:消费者。从Queue获取消息,并做响应的处理。
- Exchange:交换机。负责消息路由,将从Publisher接受到的消息分发到不同的Queues上。
- Queue:队列。用于从Publishers或Exchange接收消息并存储。
- Virtual Host:虚拟主机。隔离不同用户的Exchange、Queue,消息的隔离。
消息模型
RabbitMQ官方文档中给出了5个Demo,它们分别对应几种不同的用法:
-
基本消息队列(Basic Queue):
-
工作消息队列(Work Queue):
-
发布与订阅(Publish and Subscribe)。
根据交换机类型的不同,可分为三种:
-
Fanout Exchange(广播):
-
Direct Exchange(路由):
-
Topic Exchange(主题):
-
RabbitMQ 依赖
使用RabbitMQ之前需要先导入依赖。可以导入Spring AMQP的依赖,其中包含了RabbitMQ的依赖:
<!-- AMQP依赖,包含RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
对于RabbitMQ来说,Publisher和Consumer都是使用这个依赖,所以这个依赖坐标可以放在它们的父工程中。
基本消息队列
最基础的消息队列模型只包含三种角色:
- Publisher:消息发布者,将消息发送到队列Queue。
- Queue:消息队列,负责接受并缓存Publisher发送的消息。
- Consumer:订阅队列,处理Queue中的消息。
Queue由RabbitMQ实现,需要由我们实现的角色为Publisher和Consumer。
实现基本消息队列可以使用RabbitMQ的原生接口来模拟。创建两个Spring模块Publisher
和Consumer
来模拟消息的发送和接收:
-
在
Publisher
模块中创建PublisherTest
测试类来模拟消息发送:public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1. 建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1. 设置连接参数 factory.setHost("localhost"); // 主机地址 factory.setPort(5672); // 端口号 factory.setVirtualHost("/"); // 虚拟主机 factory.setUsername("linner"); // 账号 factory.setPassword("123456"); // 密码 // 1.2. 建立连接 Connection connection = factory.newConnection(); // 2. 创建通道Channel Channel channel = connection.createChannel(); // 3. 创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4. 发送消息 String message = "Hello RabbitMQ!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:[" + message + "]"); // 5. 关闭通道和连接 channel.close(); connection.close(); } }
-
在
Consumer
模块中创建ConsumerTest
来模拟消息接收:注:接收消息需要在
main()
中创建回调函数。不能使用一般的测试方法。public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1. 创建连接工厂,建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1. 设置连接参数 factory.setHost("localhost"); // 主机地址 factory.setPort(5672); // 端口号 factory.setVirtualHost("/"); // 虚拟主机 factory.setUsername("linner"); // 用户名 factory.setPassword("123456"); // 密码 // 1.2. 建立连接 Connection connection = factory.newConnection(); // 2. 创建通道Channel Channel channel = connection.createChannel(); // 3. 创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4. 订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5. 处理消息 String message = new String(body); System.out.println("接收到消息:[" + message + "]"); } }); System.out.println("等待接收消息......"); } }
模拟过程:
-
先给
testSendMessage()
打个断点,然后再以调试模式启动PublisherTest
,逐条执行。可以在RabbitMQ管理界面中发现创建了对应的连接、通道和消息队列。并且,进入队列后可以发现有一条消息在准备中。MQ连接:
MQ通道:
消息队列(Read项为1,表示有1条准备消息):
准备中的消息:
-
此时启动
Consumer
,可以发现有一条新的连接和新的通道。并且消息队列中的准备消息变为0。新的MQ连接:
新的MQ通道:
消息队列:
在
Consumer
的控制台中可以发现打印了一条新消息:接收到消息:[Hello RabbitMQ!]
-
再次启动
PublisherTest
,Consumer
控制台再次打印一条消息:接收到消息:[Hello RabbitMQ!]
在搭建基本消息队列的过程中,创建队列之前的操作对于Publisher和Consumer来说是一致的。建立连接这个过程对于Publisher和Consumer来说都是必要的,之所以它们都要创建队列,是为了防止队列不存在的情况发生。它们的流程中不同的是Publisher创建队列后是为了发送消息,Consumer创建队列后是为了接收消息。
创建基本消息队列的流程大致可总结为:
- 建立连接(Connection)。
- 创建连接工厂;
- 设置连接参数;
- 最后再建立连接。
- 利用Connection创建通道(Channel)。
- 利用Channel声明消息队列。
- 利用Channel在指定的消息队列中发送消息或订阅消息。
- 对于Publisher来说,需要在消息队列中发送消息。发送完消息后,需要按顺序关闭通道和连接。
- 对于Consumer来说,需要在消息队列中接收消息。
评论