AMQP(Advanced Message Queuing Protocol)是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

RabbitMQ实现了AMQP协议。Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

Spring AMQP的依赖如下,其中包含了RabbitMQ的依赖:

<!-- AMQP依赖,包含RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在Spring中使用RabbitMQ可以使用Spring AMQP的依赖。对于RabbitMQ来说,Publisher和Consumer都是使用这个依赖,所以这个依赖坐标可以放在它们的父工程中。

导入依赖后,需要先编写application.yml,添加RabbitMQ连接信息(Publisher和Consumer都需要编写此配置):

spring:
  rabbitmq:   # RabbitMQ配置信息
    host: localhost   # 主机地址
    port: 5672        # 端口
    virtual-host: /   # 虚拟主机
    username: linner  # 用户名
    password: 123456  # 密码

RabbitMQ官方文档中给出了5个Demo,它们分别对应几种不同的用法:

  • 基本消息队列(Basic Queue):

    Basic Queue

  • 工作消息队列(Work Queue):

    Work Queue

  • 发布与订阅(Publish/Subscribe)。

    根据交换机类型的不同,可分为三种:

    • Fanout Exchange(广播):

      Fanout Exchange

    • Direct Exchange(路由,Routing):

      Direct Exchange

    • Topic Exchange(主题):

      Topic Exchange

接下来分别按照这5个模型来实现。


基本消息队列

基本消息队列模型只包含三种角色:

  • Publisher:消息发布者,将消息发送到队列Queue。
  • Queue:消息队列,负责接受并缓存消息。
  • Consumer:订阅队列,处理队列中的消息。

Spring AMQP通过RabbitMQ提供的AMQP接口,整合了RabbitMQ,让RabbitMQ使用起来更加方便。

Spring AMQP提供了RabbitTemplate来给RabbitMQ发送消息,在使用之时直接@Autowired即可。

Publisher模块中新建一个SpringAmqpTest测试类,然后使用RabbitTemplate发送消息:

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        String queueName = "simple.queue";
        String message = "Hello Spring AMQP!";
        rabbitTemplate.convertAndSend(queueName, message);
        log.info("发送消息:[" + message + "]");
    }
}

将以上代码放在业务代码中即可实现异步通讯。

Consumer中新建一个listener包,并且在包中创建SpringRabbitListener,然后使用@RabbitListener定义接收消息的方法:

@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = {"simple.queue"})
    public void listenSimpleQueue(String msg) {
        log.info("收到消息:[" + msg + "]");
    }
}

直接启动Consumer的启动类即可开始接收消息。

注:@RabbitListenerqueues属性没有创建消息队列的能力。如果先启动Consumer报错Failed to declare queue: simple.queue,可以创建一个RabbitConfig,然后在RabbitConfig中创建队列;或者直接在启动类中创建队列。

Consumer中创建simple.queue

@Configuration
public class RabbitConfig {

    @Bean
    public Queue getSimpleQueue() {
        return new Queue("simple.queue");
    }
}

另外还需要注意,导入的Queueorg.springframework.amqp.core包下的。


工作队列模型

修改Publisher,模拟每秒钟发送50条消息:

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testWorkQueue() throws InterruptedException {
        String queueName = "work.queue";
        String message = "Hello Message__";
        for (int i = 0; i < 50; i++) {
            String msg = message + i;
            rabbitTemplate.convertAndSend(queueName, msg);
            log.info("发送消息:[" + msg + "]");
            Thread.sleep(20);   // 休眠 20ms,1s == 1000ms,所以1s最多只能发50条
        }
    }
}

修改Consumer,模拟两个消费者共同接收work.queue的消息:

@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queuesToDeclare = {@Queue(WORK_QUEUE)}) // 声明并订阅Queue
    public void listenWorkQueue1(String msg) throws InterruptedException {
        log.warn("消费者 1 收到消息:[" + msg + "]");   // 用不同的日志级别区分消费者1和消费者2,更易于阅读
        Thread.sleep(20);   // 处理速度约:50条/s
    }

    @RabbitListener(queuesToDeclare = {@Queue(WORK_QUEUE)})
    public void listenWorkQueue2(String msg) throws InterruptedException {
        log.error("消费者 2 收到消息:[" + msg + "]");
        Thread.sleep(200);  // 处理速度约:5条/s
    }
}

@RabbitListenerqueuesToDeclare属性是@Queue类型数组。queuesToDeclare在指定多个队列的同时,会去声明它们,这样就能不通过创建Bean来声明它们。

先启动Consumer,然后在启动Publisher,发现消息被两个消费者平分处理了(一个消费者处理单数号消息,另一个处理双数号消息),并没有按照各自处理速度去分配。这是因为消费者存在消费预取。无论在规定时间内是否能处理得完,消费者都会按照配置去自动预取。所以就导致了两个消费者各取一半消息去处理。

要解决这个消费预取限制,让消费者能根据自己的能力预取消息,可以在application.yml中修改spring.rebbitmq.listener.simple.prefetch

spring:
  rabbitmq:   # RabbitMQ配置信息
    ##### 连接信息... #####
    listener:
      simple:
        prefetch: 1 # 每次只能预取 1 条消息,处理完才能获取下一条

然后按照上方,先启动(或重启)Consumer,然后在启动Publisher,观察控制台打印的日志可以发现这次处理完50条消息的时间大约为1秒。

工作队列模式的特点是:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。
  • 通过设置Prefetch来控制消费者预取的消息数量。

发布/订阅模型

发布/订阅模式通过Exchange(交换机)方式实现了将同一消息发送给多个消费者。

发布/订阅的模型如图:

发布/订阅模型

模型中多了Exchange角色,并且过程略有变化:

  • Publisher(生产者):值要发送消息的程序,但是不再发送到队列中,而是发给Exchange(交换机)。
  • Exchange(交换机):一方面,接收生产者发送的消息;另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
    • Topic:
  • Consumer(消费者):订阅队列,处理队列中的消息。。
  • Queue:接收消息、缓存消息。

注:Exchange负责消息路由,而不是存储,路由失败则消息丢失。

常见Exchange类型包括:

  • Fanout(扇出):广播,将消息交给所有绑定到交换机的队列。
  • Direct(定向):路由,把消息交给符合指定Routing Key的队列。
  • Topic(话题):通配符,把消息交给符合Routing Pattern(路由模式)的队列。

Spring AMQP提供了声明交换机、队列、绑定关系的API,声明交换机API的继承关系图如下:

交换机API继承关系图

Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的Queue。声明队列、声明交换机和绑定关系都要在Consumer中完成。

Consumer中添加新的配置类:

@Configuration
public class FanoutConfig {

    public static final String FANOUT_EXCHANGE = "linner.fanout";   // 交换机名称
    public static final String FANOUT_QUEUE_1 = "fanout.queue1";    // 队列1名称
    public static final String FANOUT_QUEUE_2 = "fanout.queue2";    // 队列2名称

    /**
     * 声明FanoutExchange交换机
     */
    @Bean
    public FanoutExchange getFanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    /**
     * 声明第1个队列
     */
    @Bean("fanoutQueue1")
    public Queue getFanoutQueue1() {
        return new Queue(FANOUT_QUEUE_1);
    }

    /**
     * 绑定队列1和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean("fanoutQueue2")
    public Queue getFanoutQueue2() {
        return new Queue(FANOUT_QUEUE_2);
    }

    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }

    /* 第3个队列以相同方式声明,并绑定... */
}

然后修改ConsumerSpringRabbitListener类:

@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_1})
    public void listenFanoutQueue1(String msg) {
        log.info("消费者收到 [" + FanoutConfig.FANOUT_QUEUE_1 + "] 的消息: [" + msg + "]");
    }

    @RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_2})
    public void listenFanoutQueue2(String msg) {
        log.info("消费者收到 [" + FanoutConfig.FANOUT_QUEUE_2 + "] 的消息: [" + msg + "]");
    }
}

修改Publisher,添加新的测试方法模拟发送消息:

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String FANOUT_EXCHANGE = "linner.fanout";  // 交换机名称

    @Test
    public void testSendFanoutExchange() {
        String message = "Hello everyone!"; // 消息
        // 发送消息
        rabbitTemplate.convertAndSend(FANOUT_EXCHANGE, "", message);
        log.info("发送消息:[" + message + "]");
    }
}

先运行Consumer,然后再运行PublishertestSendFanoutExchange()测试方法发送消息,在Consumer的终端可以发现,两个消费者都接收到消息了。

RabbitTemplateconvertAndSend()方法有不同的实现,可以向队列发送消息,也可以向交换机发送。

Fanout Exchange的特点:

  • 可以有多个队列;
  • 每个队列都要绑定到Exchange;
  • 生产者发送的消息,只能发送到Exchange,由Exchange决定发给哪个队列,生产者无法决定(也不知道);
  • 交换机把消息发送给绑定过的所有队列;
  • 订阅队列的消费者都能拿到消息。

Fanout Exchange的作用:

  • 接收Publisher发送的消息;
  • 将消息按照规则路由到与之绑定的队列,Fanout Exchange的会将消息路由到每个绑定的队列;
  • 不能缓存消息,路由失败,消息丢失。

Direct Exchange

Direct Exchange通过Binding Key(绑定密钥)和Routing Key(路由密钥)将消息路由到不同的队列上。

在Direct Exchange模式下,队列在声明时可以指定一个或多个Binding Key;Publisher在向Exchange发送消息时,通过指定Routing Key,告诉Exchange要将消息发送到哪个队列上。

Consumer中添加新的listen方法:

@Slf4j
@Component
public class SpringRabbitListener {

    private static final String DIRECT_EXCHANGE_NAME = "linner.direct";
    private static final String DIRECT_QUEUE_1_NAME = "direct.queue1";
    private static final String DIRECT_QUEUE_2_NAME = "direct.queue2";

    @RabbitListener(bindings = {@QueueBinding(   // 设置绑定关系,绑定队列
            value = @Queue(DIRECT_QUEUE_1_NAME),    // 设置绑定的队列(会自动创建队列)
            exchange = @Exchange(name = DIRECT_EXCHANGE_NAME,   // 设置绑定的交换机(会自动创建交换机)
                    type = ExchangeTypes.DIRECT /* 设置交换机类型,默认为Direct */),
            key = {"red", "blue"}   // Binding Keys,可以指定多个,当
    )})
    public void listenDirectQueue1(String msg) {
        log.info("消费者收到 [" + DIRECT_QUEUE_1_NAME + "] 的消息: [" + msg + "]");
    }

    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue(DIRECT_QUEUE_2_NAME),
            exchange = @Exchange(name = DIRECT_EXCHANGE_NAME,
                    type = ExchangeTypes.DIRECT /* 默认 */),
            key = {"red", "yellow"}
    )})
    public void listenDirectQueue2(String msg) {
        log.info("消费者收到 [" + DIRECT_QUEUE_2_NAME + "] 的消息: [" + msg + "]");
    }
}

@RabbitListenerbindings属性提供了简便的绑定方式。bindings属性是@QueueBinding类型数组,可以同时有多个绑定。其中的每个绑定使用@QueueBinding来定义。

@QueueBinding通常需要设置的属性有以下三个:

  • value:指明绑定的队列,类型为@Queue。通过使用@Queue来声明并订阅队列。
  • exchange:指明绑定的交换机,类型为@Exchange。通过使用@Exchange,可以指定交换机的名称和类型。交换机类型可以使用ExchangeTypes下的字符串常量。
  • key:指明Binding Key(s),是一个字符串数组类型。

Publisher中添加新的testSend方法:

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String DIRECT_EXCHANGE = "linner.direct";  // 交换机名称

    @Test
    public void testSendDirectExchange() {
        String routingKey = "red";
        String message = "Hello direct exchange " + routingKey + "!"; // 消息
        // 发送消息(发送消息时需要指定Routing Key)
        rabbitTemplate.convertAndSend(DIRECT_EXCHANGE, routingKey, message);
        log.info("发送消息:[" + message + "]");
    }
}

先启动Consumer,然后再启动PublishertestSendDirectExchange()方法来进行测试。你可以通过将routingKey修改为blueyellow来测试其它路由密钥。

Direct Exchange的规则是这样的:

  • Queue绑定一个或多个Binding Key(s)到Exchange。
  • Publisher在给Exchange发送消息时,指定Routing Key。
  • Exchange收到消息,并且根据Publisher指定的Routing Key去匹配对应的Queue(s)。当Exchange发现有Queue(s)的Binding Keys中,有与Publisher指定的Routing Key对应的Key时,就将消息发送给对应的Queue(s)。

注:如果在Direct Exchange模式下使用Fanout Exchange的方式发送消息(即rabbitTemplate.convertAndSend(DIRECT_EXCHANGE, "", message);),那么是不会有任何一个队列接收到此消息的。除非在绑定队列时,设置了这样一个Key:key = {""}。Direct Exchange模式将""识别为一个Key。

Topic Exchange

Topic Exchange模式与Direct Exchange模式类似,区别在于Topic Exchange下的Routing Key必须是个多个单词的列表,并且以.分割。

例如:

  • china.news:代表有中国的新闻消息;
  • china.weather:代表中国的天气消息。

Topic Exchange模式中,Queue与Exchange指定Binding Key时可以使用通配符:

通配符 说明
# 代指0个或多个单词。
* 代指1个单词。

Consumer中添加新的listen方法:

@Slf4j
@Component
public class SpringRabbitListener {

    private static final String TOPIC_EXCHANGE_NAME = "linner.topic";
    private static final String TOPIC_QUEUE_1_NAME = "topic.queue1";
    private static final String TOPIC_QUEUE_2_NAME = "topic.queue2";

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(TOPIC_QUEUE_1_NAME),
            exchange = @Exchange(
                    name = TOPIC_EXCHANGE_NAME,
                    type = ExchangeTypes.TOPIC  // 设置交换机类型为Topic
            ),
            key = "china.#"                     // 设置包含通配符的Binding Key
    ))
    public void listenTopicQueue1(String msg) {
        log.info("消费者收到 [" + TOPIC_QUEUE_1_NAME + "] 的消息: [" + msg + "]");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(TOPIC_QUEUE_2_NAME),
            exchange = @Exchange(
                    name = TOPIC_EXCHANGE_NAME,
                    type = ExchangeTypes.TOPIC
            ),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg) {
        log.info("消费者收到 [" + TOPIC_QUEUE_2_NAME + "] 的消息: [" + msg + "]");
    }
}

Publisher中添加新的testSend方法:

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String TOPIC_EXCHANGE = "linner.topic";  // 交换机名称

    @Test
    public void testSendTopicExchange() {
        String routingKey = "china.news";
        String message = "Hello " + TOPIC_EXCHANGE + " " + routingKey + "!"; // 消息
        // 发送消息
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE, routingKey, message);
        log.info("发送消息:[" + message + "]");
    }
}

同样是以先启动Consumer,再启动Publisher的方式进行测试,然后再通过修改routingKey来测试不同的匹配规则。


消息转换器

在Spring AMOP中,发送消息的RabbitTemplate.convertAndSend()方法,其实发送的是Object类型。在发送消息时,Spring AMOP会将发送的消息序列化为字节发送给RabbitMQ;接收消息的时候,还会把字节反序列化为Java对象。也就是说,通讯过程传输的消息可以是任意类型的对象。

默认序列化方式

默认情况下Spring AMOP采用的序列化方式是JDK序列化。

ConsumerRabbitConfig中,添加一个新的队列:

@Configuration
public class RabbitConfig {

    public static final String OBJECT_QUEUE_NAME = "object.queue";

    @Bean
    public Queue getObjectQueue() {
        return new Queue(OBJECT_QUEUE_NAME);
    }
}

然后启动Consumer(这是为了展示Spring AMQP默认序列化方式在RabbitMQ中的序列化结果,所以先不在Consumer中接收消息)。

接着在Publisher中创建testSend方法:

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String OBJECT_QUEUE_NAME = "object.queue";

    @Test
    public void testSendObjectQueue() {
        Map<String, Object> msg = new HashMap<>();
        msg.put("name", "Linner");
        msg.put("age", null);
        rabbitTemplate.convertAndSend(OBJECT_QUEUE_NAME, msg);  // 发送一个Map对象
    }
}

启动测试方法,在RabbitMQ管理页面可以看到object.queue中存在这样一条消息:

Spring AMOP 默认序列化结果

可以发现JDK序列化存在下列问题:

  • 数据体积过大;
  • 有安全漏洞;
  • 可读性差。

自定义序列化方式

JSON是一种很好的序列化方式,我们可以将Spring AMQP默认的JDK序列化改成Jackson。

首先导入依赖,由于序列化后还需反序列化,所以PublisherConsumser模块都需要导入Jackson依赖。我们可以将依赖导入它们的父工程:

<!--Jackson核心依赖-->
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

然后分别在PublisherConsumser中替换Spring AMQP默认序列化方式:

@Configuration
public class AmqpConfig {

    /**
     * 自定义消息转换器
     */
    @Bean
    public MessageConverter getMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

先运行Publisher.testSendObjectQueue()方法,观察Jackson是否有生效。运行结果如下:

Jackson 序列化结果

最后在Consumser中创建一个新的listen方法:

@Slf4j
@Component
public class SpringRabbitListener {

    private static final String OBJECT_QUEUE_NAME = "object.queue";

    @RabbitListener(queuesToDeclare = @Queue(OBJECT_QUEUE_NAME))
    public void listenObjectQueue(Map<String, Object> msg) {
        log.info("消费者收到 [" + OBJECT_QUEUE_NAME + "] 的消息: " + msg);
    }
}

重启Consumser,观察终端输出的日志,如果成功打印出Map对象,说明反序列化成功。

注:自定义Spring AMQP序列化方式时,Publisher和Consumer配置的消息转换器都必须相同。就是说,序列化和反序列化一定要使用同个消息转换器。