AMQP(Advanced Message Queuing Protocol)是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
RabbitMQ实现了AMQP协议。Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp
是基础抽象,spring-rabbit
是底层的默认实现。
Spring AMQP的依赖如下,其中包含了RabbitMQ的依赖:
<!-- AMQP依赖,包含RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在Spring中使用RabbitMQ可以使用Spring AMQP的依赖。对于RabbitMQ来说,Publisher和Consumer都是使用这个依赖,所以这个依赖坐标可以放在它们的父工程中。
导入依赖后,需要先编写application.yml
,添加RabbitMQ连接信息(Publisher和Consumer都需要编写此配置):
spring:
rabbitmq: # RabbitMQ配置信息
host: localhost # 主机地址
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: linner # 用户名
password: 123456 # 密码
RabbitMQ官方文档中给出了5个Demo,它们分别对应几种不同的用法:
-
基本消息队列(Basic Queue):
-
工作消息队列(Work Queue):
-
发布与订阅(Publish/Subscribe)。
根据交换机类型的不同,可分为三种:
-
Fanout Exchange(广播):
-
Direct Exchange(路由,Routing):
-
Topic Exchange(主题):
-
接下来分别按照这5个模型来实现。
基本消息队列
基本消息队列模型只包含三种角色:
- Publisher:消息发布者,将消息发送到队列Queue。
- Queue:消息队列,负责接受并缓存消息。
- Consumer:订阅队列,处理队列中的消息。
Spring AMQP通过RabbitMQ提供的AMQP接口,整合了RabbitMQ,让RabbitMQ使用起来更加方便。
Spring AMQP提供了RabbitTemplate
来给RabbitMQ发送消息,在使用之时直接@Autowired
即可。
在Publisher
模块中新建一个SpringAmqpTest
测试类,然后使用RabbitTemplate
发送消息:
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "Hello Spring AMQP!";
rabbitTemplate.convertAndSend(queueName, message);
log.info("发送消息:[" + message + "]");
}
}
将以上代码放在业务代码中即可实现异步通讯。
在Consumer
中新建一个listener
包,并且在包中创建SpringRabbitListener
,然后使用@RabbitListener
定义接收消息的方法:
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = {"simple.queue"})
public void listenSimpleQueue(String msg) {
log.info("收到消息:[" + msg + "]");
}
}
直接启动Consumer
的启动类即可开始接收消息。
注:
@RabbitListener
的queues
属性没有创建消息队列的能力。如果先启动Consumer
报错Failed to declare queue: simple.queue
,可以创建一个RabbitConfig
,然后在RabbitConfig
中创建队列;或者直接在启动类中创建队列。在
Consumer
中创建simple.queue
:@Configuration public class RabbitConfig { @Bean public Queue getSimpleQueue() { return new Queue("simple.queue"); } }
另外还需要注意,导入的
Queue
是org.springframework.amqp.core
包下的。
工作队列模型
修改Publisher
,模拟每秒钟发送50条消息:
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testWorkQueue() throws InterruptedException {
String queueName = "work.queue";
String message = "Hello Message__";
for (int i = 0; i < 50; i++) {
String msg = message + i;
rabbitTemplate.convertAndSend(queueName, msg);
log.info("发送消息:[" + msg + "]");
Thread.sleep(20); // 休眠 20ms,1s == 1000ms,所以1s最多只能发50条
}
}
}
修改Consumer
,模拟两个消费者共同接收work.queue
的消息:
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queuesToDeclare = {@Queue(WORK_QUEUE)}) // 声明并订阅Queue
public void listenWorkQueue1(String msg) throws InterruptedException {
log.warn("消费者 1 收到消息:[" + msg + "]"); // 用不同的日志级别区分消费者1和消费者2,更易于阅读
Thread.sleep(20); // 处理速度约:50条/s
}
@RabbitListener(queuesToDeclare = {@Queue(WORK_QUEUE)})
public void listenWorkQueue2(String msg) throws InterruptedException {
log.error("消费者 2 收到消息:[" + msg + "]");
Thread.sleep(200); // 处理速度约:5条/s
}
}
@RabbitListener
的queuesToDeclare
属性是@Queue
类型数组。queuesToDeclare
在指定多个队列的同时,会去声明它们,这样就能不通过创建Bean来声明它们。
先启动Consumer
,然后在启动Publisher
,发现消息被两个消费者平分处理了(一个消费者处理单数号消息,另一个处理双数号消息),并没有按照各自处理速度去分配。这是因为消费者存在消费预取。无论在规定时间内是否能处理得完,消费者都会按照配置去自动预取。所以就导致了两个消费者各取一半消息去处理。
要解决这个消费预取限制,让消费者能根据自己的能力预取消息,可以在application.yml
中修改spring.rebbitmq.listener.simple.prefetch
:
spring:
rabbitmq: # RabbitMQ配置信息
##### 连接信息... #####
listener:
simple:
prefetch: 1 # 每次只能预取 1 条消息,处理完才能获取下一条
然后按照上方,先启动(或重启)Consumer
,然后在启动Publisher
,观察控制台打印的日志可以发现这次处理完50条消息的时间大约为1秒。
工作队列模式的特点是:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。
- 通过设置Prefetch来控制消费者预取的消息数量。
发布/订阅模型
发布/订阅模式通过Exchange(交换机)方式实现了将同一消息发送给多个消费者。
发布/订阅的模型如图:
模型中多了Exchange角色,并且过程略有变化:
- Publisher(生产者):值要发送消息的程序,但是不再发送到队列中,而是发给Exchange(交换机)。
- Exchange(交换机):一方面,接收生产者发送的消息;另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Topic:
- Consumer(消费者):订阅队列,处理队列中的消息。。
- Queue:接收消息、缓存消息。
注:Exchange负责消息路由,而不是存储,路由失败则消息丢失。
常见Exchange类型包括:
- Fanout(扇出):广播,将消息交给所有绑定到交换机的队列。
- Direct(定向):路由,把消息交给符合指定Routing Key的队列。
- Topic(话题):通配符,把消息交给符合Routing Pattern(路由模式)的队列。
Spring AMQP提供了声明交换机、队列、绑定关系的API,声明交换机API的继承关系图如下:
Fanout Exchange
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的Queue。声明队列、声明交换机和绑定关系都要在Consumer中完成。
在Consumer
中添加新的配置类:
@Configuration
public class FanoutConfig {
public static final String FANOUT_EXCHANGE = "linner.fanout"; // 交换机名称
public static final String FANOUT_QUEUE_1 = "fanout.queue1"; // 队列1名称
public static final String FANOUT_QUEUE_2 = "fanout.queue2"; // 队列2名称
/**
* 声明FanoutExchange交换机
*/
@Bean
public FanoutExchange getFanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
/**
* 声明第1个队列
*/
@Bean("fanoutQueue1")
public Queue getFanoutQueue1() {
return new Queue(FANOUT_QUEUE_1);
}
/**
* 绑定队列1和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean("fanoutQueue2")
public Queue getFanoutQueue2() {
return new Queue(FANOUT_QUEUE_2);
}
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
/* 第3个队列以相同方式声明,并绑定... */
}
然后修改Consumer
的SpringRabbitListener
类:
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_1})
public void listenFanoutQueue1(String msg) {
log.info("消费者收到 [" + FanoutConfig.FANOUT_QUEUE_1 + "] 的消息: [" + msg + "]");
}
@RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_2})
public void listenFanoutQueue2(String msg) {
log.info("消费者收到 [" + FanoutConfig.FANOUT_QUEUE_2 + "] 的消息: [" + msg + "]");
}
}
修改Publisher
,添加新的测试方法模拟发送消息:
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String FANOUT_EXCHANGE = "linner.fanout"; // 交换机名称
@Test
public void testSendFanoutExchange() {
String message = "Hello everyone!"; // 消息
// 发送消息
rabbitTemplate.convertAndSend(FANOUT_EXCHANGE, "", message);
log.info("发送消息:[" + message + "]");
}
}
先运行Consumer
,然后再运行Publisher
的testSendFanoutExchange()
测试方法发送消息,在Consumer
的终端可以发现,两个消费者都接收到消息了。
RabbitTemplate
的convertAndSend()
方法有不同的实现,可以向队列发送消息,也可以向交换机发送。
Fanout Exchange的特点:
- 可以有多个队列;
- 每个队列都要绑定到Exchange;
- 生产者发送的消息,只能发送到Exchange,由Exchange决定发给哪个队列,生产者无法决定(也不知道);
- 交换机把消息发送给绑定过的所有队列;
- 订阅队列的消费者都能拿到消息。
Fanout Exchange的作用:
- 接收Publisher发送的消息;
- 将消息按照规则路由到与之绑定的队列,Fanout Exchange的会将消息路由到每个绑定的队列;
- 不能缓存消息,路由失败,消息丢失。
Direct Exchange
Direct Exchange通过Binding Key(绑定密钥)和Routing Key(路由密钥)将消息路由到不同的队列上。
在Direct Exchange模式下,队列在声明时可以指定一个或多个Binding Key;Publisher在向Exchange发送消息时,通过指定Routing Key,告诉Exchange要将消息发送到哪个队列上。
在Consumer
中添加新的listen
方法:
@Slf4j
@Component
public class SpringRabbitListener {
private static final String DIRECT_EXCHANGE_NAME = "linner.direct";
private static final String DIRECT_QUEUE_1_NAME = "direct.queue1";
private static final String DIRECT_QUEUE_2_NAME = "direct.queue2";
@RabbitListener(bindings = {@QueueBinding( // 设置绑定关系,绑定队列
value = @Queue(DIRECT_QUEUE_1_NAME), // 设置绑定的队列(会自动创建队列)
exchange = @Exchange(name = DIRECT_EXCHANGE_NAME, // 设置绑定的交换机(会自动创建交换机)
type = ExchangeTypes.DIRECT /* 设置交换机类型,默认为Direct */),
key = {"red", "blue"} // Binding Keys,可以指定多个,当
)})
public void listenDirectQueue1(String msg) {
log.info("消费者收到 [" + DIRECT_QUEUE_1_NAME + "] 的消息: [" + msg + "]");
}
@RabbitListener(bindings = {@QueueBinding(
value = @Queue(DIRECT_QUEUE_2_NAME),
exchange = @Exchange(name = DIRECT_EXCHANGE_NAME,
type = ExchangeTypes.DIRECT /* 默认 */),
key = {"red", "yellow"}
)})
public void listenDirectQueue2(String msg) {
log.info("消费者收到 [" + DIRECT_QUEUE_2_NAME + "] 的消息: [" + msg + "]");
}
}
@RabbitListener
的bindings
属性提供了简便的绑定方式。bindings
属性是@QueueBinding
类型数组,可以同时有多个绑定。其中的每个绑定使用@QueueBinding
来定义。
@QueueBinding
通常需要设置的属性有以下三个:
value
:指明绑定的队列,类型为@Queue
。通过使用@Queue
来声明并订阅队列。exchange
:指明绑定的交换机,类型为@Exchange
。通过使用@Exchange
,可以指定交换机的名称和类型。交换机类型可以使用ExchangeTypes
下的字符串常量。key
:指明Binding Key(s),是一个字符串数组类型。
在Publisher
中添加新的testSend
方法:
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String DIRECT_EXCHANGE = "linner.direct"; // 交换机名称
@Test
public void testSendDirectExchange() {
String routingKey = "red";
String message = "Hello direct exchange " + routingKey + "!"; // 消息
// 发送消息(发送消息时需要指定Routing Key)
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE, routingKey, message);
log.info("发送消息:[" + message + "]");
}
}
先启动Consumer
,然后再启动Publisher
的testSendDirectExchange()
方法来进行测试。你可以通过将routingKey
修改为blue
或yellow
来测试其它路由密钥。
Direct Exchange的规则是这样的:
- Queue绑定一个或多个Binding Key(s)到Exchange。
- Publisher在给Exchange发送消息时,指定Routing Key。
- Exchange收到消息,并且根据Publisher指定的Routing Key去匹配对应的Queue(s)。当Exchange发现有Queue(s)的Binding Keys中,有与Publisher指定的Routing Key对应的Key时,就将消息发送给对应的Queue(s)。
注:如果在Direct Exchange模式下使用Fanout Exchange的方式发送消息(即
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE, "", message);
),那么是不会有任何一个队列接收到此消息的。除非在绑定队列时,设置了这样一个Key:key = {""}
。Direct Exchange模式将""
识别为一个Key。
Topic Exchange
Topic Exchange模式与Direct Exchange模式类似,区别在于Topic Exchange下的Routing Key必须是个多个单词的列表,并且以.
分割。
例如:
china.news
:代表有中国的新闻消息;china.weather
:代表中国的天气消息。
Topic Exchange模式中,Queue与Exchange指定Binding Key时可以使用通配符:
通配符 | 说明 |
---|---|
# |
代指0个或多个单词。 |
* |
代指1个单词。 |
在Consumer
中添加新的listen
方法:
@Slf4j
@Component
public class SpringRabbitListener {
private static final String TOPIC_EXCHANGE_NAME = "linner.topic";
private static final String TOPIC_QUEUE_1_NAME = "topic.queue1";
private static final String TOPIC_QUEUE_2_NAME = "topic.queue2";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(TOPIC_QUEUE_1_NAME),
exchange = @Exchange(
name = TOPIC_EXCHANGE_NAME,
type = ExchangeTypes.TOPIC // 设置交换机类型为Topic
),
key = "china.#" // 设置包含通配符的Binding Key
))
public void listenTopicQueue1(String msg) {
log.info("消费者收到 [" + TOPIC_QUEUE_1_NAME + "] 的消息: [" + msg + "]");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(TOPIC_QUEUE_2_NAME),
exchange = @Exchange(
name = TOPIC_EXCHANGE_NAME,
type = ExchangeTypes.TOPIC
),
key = "#.news"
))
public void listenTopicQueue2(String msg) {
log.info("消费者收到 [" + TOPIC_QUEUE_2_NAME + "] 的消息: [" + msg + "]");
}
}
在Publisher
中添加新的testSend
方法:
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String TOPIC_EXCHANGE = "linner.topic"; // 交换机名称
@Test
public void testSendTopicExchange() {
String routingKey = "china.news";
String message = "Hello " + TOPIC_EXCHANGE + " " + routingKey + "!"; // 消息
// 发送消息
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE, routingKey, message);
log.info("发送消息:[" + message + "]");
}
}
同样是以先启动Consumer
,再启动Publisher
的方式进行测试,然后再通过修改routingKey
来测试不同的匹配规则。
消息转换器
在Spring AMOP中,发送消息的RabbitTemplate.convertAndSend()
方法,其实发送的是Object
类型。在发送消息时,Spring AMOP会将发送的消息序列化为字节发送给RabbitMQ;接收消息的时候,还会把字节反序列化为Java对象。也就是说,通讯过程传输的消息可以是任意类型的对象。
默认序列化方式
默认情况下Spring AMOP采用的序列化方式是JDK序列化。
在Consumer
的RabbitConfig
中,添加一个新的队列:
@Configuration
public class RabbitConfig {
public static final String OBJECT_QUEUE_NAME = "object.queue";
@Bean
public Queue getObjectQueue() {
return new Queue(OBJECT_QUEUE_NAME);
}
}
然后启动Consumer
(这是为了展示Spring AMQP默认序列化方式在RabbitMQ中的序列化结果,所以先不在Consumer
中接收消息)。
接着在Publisher
中创建testSend
方法:
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String OBJECT_QUEUE_NAME = "object.queue";
@Test
public void testSendObjectQueue() {
Map<String, Object> msg = new HashMap<>();
msg.put("name", "Linner");
msg.put("age", null);
rabbitTemplate.convertAndSend(OBJECT_QUEUE_NAME, msg); // 发送一个Map对象
}
}
启动测试方法,在RabbitMQ管理页面可以看到object.queue
中存在这样一条消息:
可以发现JDK序列化存在下列问题:
- 数据体积过大;
- 有安全漏洞;
- 可读性差。
自定义序列化方式
JSON是一种很好的序列化方式,我们可以将Spring AMQP默认的JDK序列化改成Jackson。
首先导入依赖,由于序列化后还需反序列化,所以Publisher
和Consumser
模块都需要导入Jackson依赖。我们可以将依赖导入它们的父工程:
<!--Jackson核心依赖-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
然后分别在Publisher
和Consumser
中替换Spring AMQP默认序列化方式:
@Configuration
public class AmqpConfig {
/**
* 自定义消息转换器
*/
@Bean
public MessageConverter getMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
先运行Publisher.testSendObjectQueue()
方法,观察Jackson是否有生效。运行结果如下:
最后在Consumser
中创建一个新的listen
方法:
@Slf4j
@Component
public class SpringRabbitListener {
private static final String OBJECT_QUEUE_NAME = "object.queue";
@RabbitListener(queuesToDeclare = @Queue(OBJECT_QUEUE_NAME))
public void listenObjectQueue(Map<String, Object> msg) {
log.info("消费者收到 [" + OBJECT_QUEUE_NAME + "] 的消息: " + msg);
}
}
重启Consumser
,观察终端输出的日志,如果成功打印出Map对象,说明反序列化成功。
注:自定义Spring AMQP序列化方式时,Publisher和Consumer配置的消息转换器都必须相同。就是说,序列化和反序列化一定要使用同个消息转换器。
评论