RabbitMQ官网:https://www.rabbitmq.com/

官方文档:https://rabbitmq.com/documentation.html

官方Demo:https://www.rabbitmq.com/getstarted.html


部署 RabbitMQ

使用Docker来部署RabbitMQ只需要两个步骤:

  1. pull RabbitMQ镜像:

    docker pull rabbitmq:3-management
    

    RabbitMQ通过插件的方式提供了一个可视化界面。在RabbitMQ官方的镜像中,带management版本的就是集成了该可视化插件的镜像。

    通过Docker官方的镜像源来pull RabbitMQ镜像可能会有点慢,可以使用一些国内的镜像源:

    提供方 |镜像URL 中国区官方镜像 |https://registry.docker-cn.com 清华源 |https://docker.mirrors.ustc.edu.cn 阿里源 |https://cr.console.aliyun.com 腾讯源 |https://mirror.ccs.tencentyun.com 网易源 |http://hub-mirror.c.163.com 道客镜像站 |http://f1361db2.m.daocloud.io

  2. 启动RabbitMQ容器:

    docker run \
        -e RABBITMQ_DEFAULT_USER=linner \   # RabbitMQ用户名(默认为guest)
        -e RABBITMQ_DEFAULT_PASS=123456 \   # RabbitMQ密码(默认为guest)
        --name rabbitmq-server \
        --hostname rabbitmq \               # 域名,单机下可忽略,通常用于集群部署时
        -p 15672:15672 \                    # 可视化管理界面端口
        -p 5672:5672 \                      # 服务端口
        -d \
        rabbitmq:3-management
    

启动容器后,进入http://localhost:15672/即可进入RabbitMQ管理界面。

使用以下命令查看RabbitMQ日志:

docker logs rabbitmq-server # 替换成你容器的名称

RabbitMQ 结构

RabbitMQ的基本结构如下:

RabbitMQ的基本结构

在RabbitMQ中有几个概念:

  • channel:操作RabbitMQ的工具。
  • exchange:路由消息到队列中。
  • queue:缓存消息。
  • virtual host:虚拟主机,是对queueexchange等资源的逻辑分组。

RabbitMQ中的角色主要有:

  • Publisher:生产者。将消息发送给Queue(队列)或Exchange(交换机)。
  • Consumer:消费者。从Queue获取消息,并做响应的处理。
  • Exchange:交换机。负责消息路由,将从Publisher接受到的消息分发到不同的Queues上。
  • Queue:队列。用于从Publishers或Exchange接收消息并存储。
  • Virtual Host:虚拟主机。隔离不同用户的Exchange、Queue,消息的隔离。

消息模型

RabbitMQ官方文档中给出了5个Demo,它们分别对应几种不同的用法:

  • 基本消息队列(Basic Queue):

    Basic Queue

  • 工作消息队列(Work Queue):

    Work Queue

  • 发布与订阅(Publish and Subscribe)。

    根据交换机类型的不同,可分为三种:

    • Fanout Exchange(广播):

      Fanout Exchange

    • Direct Exchange(路由):

      Direct Exchange

    • Topic Exchange(主题):

      Topic Exchange


RabbitMQ 依赖

使用RabbitMQ之前需要先导入依赖。可以导入Spring AMQP的依赖,其中包含了RabbitMQ的依赖:

<!-- AMQP依赖,包含RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

对于RabbitMQ来说,Publisher和Consumer都是使用这个依赖,所以这个依赖坐标可以放在它们的父工程中。


基本消息队列

最基础的消息队列模型只包含三种角色:

  • Publisher:消息发布者,将消息发送到队列Queue。
  • Queue:消息队列,负责接受并缓存Publisher发送的消息。
  • Consumer:订阅队列,处理Queue中的消息。

Queue由RabbitMQ实现,需要由我们实现的角色为Publisher和Consumer。

实现基本消息队列可以使用RabbitMQ的原生接口来模拟。创建两个Spring模块PublisherConsumer来模拟消息的发送和接收:

  • Publisher模块中创建PublisherTest测试类来模拟消息发送:

    public class PublisherTest {
        @Test
        public void testSendMessage() throws IOException, TimeoutException {
    
            // 1. 建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1. 设置连接参数
            factory.setHost("localhost");   // 主机地址
            factory.setPort(5672);          // 端口号
            factory.setVirtualHost("/");    // 虚拟主机
            factory.setUsername("linner");  // 账号
            factory.setPassword("123456");  // 密码
            // 1.2. 建立连接
            Connection connection = factory.newConnection();
    
            // 2. 创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3. 创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4. 发送消息
            String message = "Hello RabbitMQ!";
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("发送消息成功:[" + message + "]");
    
            // 5. 关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    
  • Consumer模块中创建ConsumerTest来模拟消息接收:

    注:接收消息需要在main()中创建回调函数。不能使用一般的测试方法。

    public class  ConsumerTest {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            // 1. 创建连接工厂,建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1. 设置连接参数
            factory.setHost("localhost");   // 主机地址
            factory.setPort(5672);          // 端口号
            factory.setVirtualHost("/");    // 虚拟主机
            factory.setUsername("linner");  // 用户名
            factory.setPassword("123456");  // 密码
            // 1.2. 建立连接
            Connection connection = factory.newConnection();
    
            // 2. 创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3. 创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4. 订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                        AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 5. 处理消息
                    String message = new String(body);
                    System.out.println("接收到消息:[" + message + "]");
                }
            });
    
            System.out.println("等待接收消息......");
        }
    }
    

模拟过程:

  1. 先给testSendMessage()打个断点,然后再以调试模式启动PublisherTest,逐条执行。可以在RabbitMQ管理界面中发现创建了对应的连接、通道和消息队列。并且,进入队列后可以发现有一条消息在准备中。

    MQ连接:

    RabbitMQ 连接

    MQ通道:

    RabbitMQ 通道

    消息队列(Read项为1,表示有1条准备消息):

    消息队列

    准备中的消息:

    准备中的消息

  2. 此时启动Consumer,可以发现有一条新的连接和新的通道。并且消息队列中的准备消息变为0。

    新的MQ连接:

    新的 RabbitMQ 连接

    新的MQ通道:

    新的 RabbitMQ 通道

    消息队列:

    消息队列

    Consumer的控制台中可以发现打印了一条新消息:

    接收到消息:[Hello RabbitMQ!]
    
  3. 再次启动PublisherTestConsumer控制台再次打印一条消息:

    接收到消息:[Hello RabbitMQ!]
    

在搭建基本消息队列的过程中,创建队列之前的操作对于Publisher和Consumer来说是一致的。建立连接这个过程对于Publisher和Consumer来说都是必要的,之所以它们都要创建队列,是为了防止队列不存在的情况发生。它们的流程中不同的是Publisher创建队列后是为了发送消息,Consumer创建队列后是为了接收消息。

创建基本消息队列的流程大致可总结为:

  1. 建立连接(Connection)。
    1. 创建连接工厂;
    2. 设置连接参数;
    3. 最后再建立连接。
  2. 利用Connection创建通道(Channel)。
  3. 利用Channel声明消息队列。
  4. 利用Channel在指定的消息队列中发送消息或订阅消息。
    • 对于Publisher来说,需要在消息队列中发送消息。发送完消息后,需要按顺序关闭通道和连接。
    • 对于Consumer来说,需要在消息队列中接收消息。