SpringBoot整合RabbitMQ
# RabbitMQ的核心组成部分
如何理解 Routing Key? 想象一下邮局系统:当你向某人发送信件时,你会在信封上写上地址。邮局基于这个地址来决定如何分发或传递这封信。在这个比喻中,
地址就像是RabbitMQ的Routing Key
。
生产者
(发送消息的应用程序)发送消息到一个交换机
。交换机
接收到这个消息后,它必须决定如何处理它。应该发送到哪一个或哪些队列
?- 为了做出这个决定,交换机查看消息的
Routing Key
(就像邮局查看信封上的地址)。- 基于这个Routing Key和交换机上的一些绑定规则,交换机决定将消息路由到哪些队列。
# 1. 基本整合
# 1.1 引入maven依赖
<!--amqp依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2
3
4
5
# 1.2 RabbitMQ连接配置
spring:
rabbitmq:
host: 192.168.0.117 #rabbitmq服务器地址
port: 5672 #端口号
username: guest #用户名
password: guest #密码
#virtual-host: #虚拟主机
2
3
4
5
6
7
笔记
RabbitMQ的自动配置类RabbitAutoConfiguration
自动配置了连接工厂ConnectionFactory
。ConnectionFactory
从配置RabbitProperties
中获取连接信息完成连接到RabbitMQ服务器。程序中可以注入RabbitTemplate
给RabbitMQ
发送和接收消息
# 1.3 发送和接收消息API
在Spring Boot中整合RabbitMQ进行消息发送和接收,主要通过RabbitTemplate
类来实现。
# 1.3.1 发送消息API
RabbitTemplate
的convertAndSend
方法用于发送消息,有以下几种常用的重载形式:
// 使用默认交换机发送消息
rabbitTemplate.convertAndSend(String routingKey, Object message);
2
routingKey
: 目标队列的名称。在默认交换机模式下,此处的路由键直接对应队列名称。message
: 消息体内容。可以是任意对象,Spring AMQP会自动进行序列化处理。
// 指定交换机发送消息
rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message);
2
exchange
: 要发送到的交换机名称。通过指定不同的交换机,可以实现不同的消息路由策略。routingKey
: 路由键。根据不同交换机类型的规则,此参数决定消息路由路径。message
: 消息体内容。发送的消息实体,同样会被序列化。
# 1.3.2 接收消息API
RabbitTemplate
的receiveAndConvert
方法用于接收消息:
// 从指定队列接收消息
Object data = rabbitTemplate.receiveAndConvert(String queueName);
2
queueName
: 队列名称。指定从哪个队列中接收消息。
这个方法会阻塞直到收到消息,然后自动将消息内容从序列化格式转换成Java对象,并返回这个对象。
# 1.4 测试发送数据
@SpringBootTest
public class MQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendDirect(){
// Message需要自己构造一个;定义消息体内容和消息头
// rabbitTemplate.send(exchage,routeKey,message);
HashMap<String, Object> map = new HashMap<>();
map.put("name", "baobao");
map.put("age", 18);
map.put("list", Arrays.asList(1,2,3,4,5));
// 使用 convertAndSend(queueName, message) 方法时,RabbitMQ 会使用默认的交换机来发送消息,
// 这个默认的交换机是一个直接交换机,路由键就是队列名称。(这里为了测试没有绑定交换机,直接在mq控制台创建一个队列测试即可)
rabbitTemplate.convertAndSend("baobao", map);
// 使用交换机和路由键发送消息
// 这样,消息会先发送到名为 "direct.exchange" 的交换机
// 然后根据路由键 "routingkey" 路由到相应的队列
// rabbitTemplate.convertAndSend("direct.exchange", "routingkey", map)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
可以发现默认发给RabbitMQ的数据以jdk的方式进行序列化,并且消息头中的content_type
保存了消息体的类型
# 1.5 测试接收数据
@Test
public void testReceiveDirect(){
// 从指定队列中接收数据
Object data = rabbitTemplate.receiveAndConvert("baobao");
System.out.println(data.getClass());
System.out.println(data);
}
2
3
4
5
6
7
# 2. 自定义json序列化
在SpringBoot中,默认可以发送给队列的数据有4种:
- 字符串
- 字节数组
- 实现了Serializable接口的对象
- Message对象实例,是标准的消息对象,包含消息头和消息体
默认发送的对象实例,是以JDK方式进行序列化的,我们也可以定制json的序列化
在自定义配置文件中添加一个自己的MessageConverter,类型是Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabbitConfig {
// 添加json格式序列化器
@Bean
public Jackson2JsonMessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
2
3
4
5
6
7
8
9
10
11
12
# 2.1 测试json序列化
@Test
public void testJsonSerilize(){
// 广播可以不需要传路由键
rabbitTemplate.convertAndSend("exchange.fanout", "", new Person("文亮", 18));
}
2
3
4
5
可以发现消息头中的content_type
属性保存了消息体的类型为application/json
,__TypeId__
属性保存了javabean
的全类名,用于反序列化。
// 测试接收json数据反序列化
@Test
public void testReceiveJson(){
Object o = rabbitTemplate.receiveAndConvert("baobao.news");
System.out.println(o.getClass());
System.out.println(o);
}
2
3
4
5
6
7
# 3. 使用注解接收消息
接收消息有两种方式,一种是使用RabbitTemplate的receiveAndConvert方法
,另一种是使用@RabbitListener注解
。
# 3.1. @RabbitListener
笔记
如果你在配置类中预先定义并绑定了队列和交换机,那么在消费者监听消息时,你可以使用@RabbitListener
注解并指定队列名接受消息,不需要再额外的配置参数。
首先先在SpringBoot启动类上面加上加@EnableRabbit
注解,表示开启基于注解的RabbitMQ模式
@SpringBootApplication
@EnableRabbit
public class MainApplicationMQ {
public static void main(String[] args) {
SpringApplication.run(MainApplicationMQ.class, args);
}
}
2
3
4
5
6
7
编写1个Service,声明一个监听方法,方法上标注@RabbitListener
,传入需要监听的队列名。监听方法可以接收的参数如下(无需保证参数顺序):
Message
对象:原生消息的详细信息,包括消息头+消息体自定义实体类
对象:用消息体反序列化后得到的JavabeanChannel
对象:当前传输的数据通道
@Service
public class PersonService {
/**
* 监听队列"baobao"中的消息。当队列中有消息时,此方法会被自动调用。
*
* @param message 原生消息的详细信息,包括消息头和消息体。
* @param person 消息体反序列化得到的Java对象。
* @param channel 当前传输数据的Channel通道。
*/
@RabbitListener(queues = "baobao")
public void listen(Message message, Person person, Channel channel) {
System.out.println("接收到的原生消息详情: " + message);
System.out.println("反序列化后的Person对象: " + person);
System.out.println("数据传输的Channel通道: " + channel);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
启动该消费者,从队列中消费1条消息
控制台打印结果
// message
(Body:'{"name":"文亮","age":18}' MessageProperties [headers={__TypeId__=com.baobao.springbootdemo.mq.bean.Person}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=amq.fanout, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-81sPttG477H4uOtS_e6tHA, consumerQueue=baobao])
// person
Person{name='文亮', age=18}
// channel
Cached Rabbit Channel: AMQChannel(amqp://guest@192.168.56.55:5672/,1), conn: Proxy@21a02097 Shared Rabbit Connection: SimpleConnection@2496b460 [delegate=amqp://guest@192.168.56.55:5672/, localPort= 6257]
2
3
4
5
6
注意事项
- 如果
只有一个消费客户端
,那么rabbitmq默认会将队列中的所有一次性发到消费者,但是消费者接收到消息后只能1个1个处理,只有处理完1个消息(即监听方法运行完毕,哪怕执行时间很长),才能继续处理下一个消息 - 如果启动
多个客户端
,都对应同一个监听消息的方法,那么对于同一个消息,只有1个客户端可以接收到 - 监听方法中的消息实例对象要与发送端对应,比如发送端发送字节数组那么接收端也要声明为字节数组参数;发送端发送Person对象那么接收端也要声明为Person类型参数
# 3. 2. @RabbitHandler
我们还可以采用@RabbitListener
配合@RabbitHandler
的方式完成对消息的监听:
@RabbitListener
:标注在类上,指定监听哪些队列@RabbitHandler
:标注在每个接收并处理不同消息的重载方法上,区分处理不同类型的消息- 使用@RabbitHandler注解允许一个类处理来自同一队列的不同类型的消息,而不是为每种消息类型创建一个单独的消费者。
@Service
//PersonService会监听到来自指定队列("baobao", "baobao.news", "baobao.map")的消息
@RabbitListener(queues = {"baobao","baobao.news","baobao.map"}) // 监听指定的三个队列
public class PersonService {
@RabbitHandler
//如果消息的内容是Person类型,那么handlePersonMsg方法将被调用。
public void handlePersonMsg(Person person){
System.out.println(person);
}
@RabbitHandler
//如果消息的内容是User类型,那么handleUserMsg方法将被调用。
public void handleUserMsg(User user){
System.out.println(user);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 4. 创建交换机、队列、绑定关系
创建交换机、队列、绑定关系的目的是为了定义消息的传递路径。交换机根据绑定关系和路由键,将生产者发送的消息路由到一个或多个队列,然后消费者从队列中获取消息并处理。
交换机(Exchange)
: 交换机是接收生产者发送的消息,并根据路由规则将消息分发到一个或多个队列的组件。队列(Queue)
: 队列是用来存储消息的地方,消费者从队列中获取消息并处理。绑定(Binding)
: 绑定是交换机和队列之间的关系。一个队列可以绑定到多个交换机,反之亦然。绑定可以有一个路由键(Routing Key),用来指定哪些消息会被路由到绑定的队列。
当我们在RabbitMQ中创建交换机、队列和绑定时,每种组件都有一系列的属性和参数可以设置。以下是这些组件的参数列表:
Exchange(交换机)
:
Name
: 交换机的名称。Type
: 交换机的类型,例如:direct, topic, fanout, headers。Durable
: 如果设置为true,则交换机会在RabbitMQ服务器重启后仍然存在。Auto Delete
: 如果设置为true,当没有队列绑定到这个交换机时,它会自动被删除。Internal
: 如果设置为true,则交换机不能被客户端直接发送消息,只能通过交换机路由消息。Arguments
: 一组可选的参数,用于扩展RabbitMQ的核心功能。例如,"alternate-exchange"可以指定一个备用交换机。
Queue(队列)
Name
: 队列的名称。Durable (durable)
: 如果设置为true,队列会在RabbitMQ服务器重启后仍然存在。Exclusive
: 如果设置为true,队列只能被创建该队列的连接使用,并且队列在该连接关闭后会自动删除。Auto Delete
: 如果设置为true,当没有消费者订阅这个队列时,它会自动被删除。Arguments
: 一组可选的参数,例如,"x-message-ttl"可以设置消息的存活时间。
Binding(绑定关系)
Destination
: 绑定的目的地名称,这可以是一个队列或另一个交换机。Destination Type
: 绑定的目的地类型,可以是QUEUE或EXCHANGE。Exchange
: 绑定的源交换机的名称。Routing Key
: 用于匹配消息的路由键。Arguments
: 一组可选的参数,可以用于头交换机的绑定,或者为某些特殊的绑定行为提供参数。
# 4.1 利用AmqpAdmin创建和绑定交换机和队列
给程序中注入AmqpAdmin
可以实现对RabbitMQ
的管理,它的declareXXX
方法可以创建exchange
、queue
、binding
等
当 Spring 容器启动时,会自动调用带有 @PostConstruct 注解的方法,从而创建交换机、队列和绑定关系。
@Configuration
public class MyRabbitConfig {
@Autowired
private AmqpAdmin amqpAdmin;
@PostConstruct // Spring容器启动时自动调用此方法
public void createExchangeQueueBinding() {
// 创建Direct类型的交换机
amqpAdmin.declareExchange(new DirectExchange("exchange.amqpadmin"));
// 创建队列
amqpAdmin.declareQueue(new Queue("queue.amqpadmin"));
// 创建绑定关系
// 参数1: 队列名称
// 参数2: 目标类型(这里是队列)
// 参数3: 交换机名称
// 参数4: 路由键
// 参数5: 其他参数(这里不需要,传null)
amqpAdmin.declareBinding(new Binding(
"queue.amqpadmin", // 队列名称
Binding.DestinationType.QUEUE, // 目标类型(绑定的是队列)
"exchange.amqpadmin", // 交换机名称
"queue.amqpadmin", // 路由键
null // 其他参数
));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
在Spring AMQP中,AmqpAdmin
接口提供了一系列用于管理RabbitMQ对象的方法,这包括交换机(Exchanges)、队列(Queues)、绑定(Bindings)等。
声明交换机
// 声明直接类型交换机
amqpAdmin.declareExchange(new DirectExchange("exchange.name"));
2
DirectExchange(String name)
: 交换机的名称。
声明队列
// 声明队列
amqpAdmin.declareQueue(new Queue("queue.name", true, false, false, null));
2
Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
: 队列名称,是否持久化,是否排他,是否自动删除,其他参数。
创建绑定
// 创建绑定
amqpAdmin.declareBinding(new Binding("queue.name", Binding.DestinationType.QUEUE, "exchange.name", "routing.key", null));
2
Binding(String destination, DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
: 目标名称(队列或交换机),目标类型,交换机名称,路由键,其他参数。
注意事项
使用amqpAdmin
创建交换机、队列、绑定关系时,会先检查rabbitmq有是否已经存在对应的交换机、队列、绑定关系,如果不存在才创建
,已存在就什么都不做。
# 4.2 配置类创建和绑定交换机和队列
- 在SpringBoot中,使用@Configuration注解的类表示这是一个配置类,可以用来定义和绑定队列、交换机以及它们之间的绑定关系。
- 这种方法允许你在应用启动时自动创建所需的RabbitMQ结构,无需手动在RabbitMQ管理界面或通过AmqpAdmin进行创建。
@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 创建交换机Bean
* 参数1 name: "test.direct",交换机的名称。
* 参数2 durable: true,表示创建的交换机是持久化的,RabbitMQ重启后交换机仍然存在。
* 参数3 autoDelete: false,表示交换机不会自动删除。
* @return DirectExchange实例
*/
@Bean
public Exchange exchange() {
return new DirectExchange("test.direct", true, false);
}
/**
* 创建队列Bean
* 参数1 name: "test.queue",队列的名称。
* 参数2 durable: true,表示队列是持久化的,RabbitMQ重启后队列仍然存在。
* 参数3 exclusive: false,表示队列不是排他的。
* 参数4 autoDelete: false,表示队列不会自动删除。
* @return Queue实例
*/
@Bean
public Queue queue() {
return new Queue("test.queue", true, false, false);
}
/**
* 创建绑定Bean
* 将队列和交换机绑定,并设置用于路由的键:"queue"
* 参数1 destination: "test.queue",绑定的目标队列名称。
* 参数2 destinationType: Binding.DestinationType.QUEUE,表示绑定类型为队列。
* 参数3 exchange: "test.direct",交换机名称。
* 参数4 routingKey: "queue",绑定时使用的路由键。
* 参数5 arguments: null,绑定时的其他参数,这里没有使用。
* @return Binding实例
*/
@Bean
public Binding binding() {
return new Binding("test.queue",
Binding.DestinationType.QUEUE, "test.direct", "queue", null);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
笔记
直接在容器中放置Bean
相比于直接利用AmqpAdmin
来创建交换机、队列、绑定关系
的区别是,容器中放置Bean的方式是懒加载的
,也就是说并不会在容器启动时就创建,而是等我们的应用第一次连接rabbitmq进行操作的时候才创建交换机、队列、绑定关系
。其底层原理是:当连接第一次创建时,会回调连接创建的监听方法,从容器中查找所有Exchange
、Queue
和Binding
对象,然后利用AmqpAdmin
将它们进行创建。也就是说SpringBoot应用刚启动时是不会创建这些对象的,只有程序首次连接rabbitmq获取connection时才会创建- 只有
rabbitmq
中不存在对应的交换机、队列、绑定关系时才会创建,已存在就什么都不做
# 4.3 配置类利用Builder模式链式创建
在Spring Boot中,我们还可以使用构建器模式(Builder Pattern)来优雅地创建交换机、队列和绑定关系。这种方式使得代码更加清晰易读,同时也简化了配置的过程。
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 创建一个直接交换机
* - name: 交换机名称,这里是 "direct.exchange"
* - durable: 是否持久化,设置为 true,表示交换机在RabbitMQ重启后依然存在
* @return 返回创建好的交换机
*/
@Bean
public Exchange exchange2(){
return ExchangeBuilder
.directExchange("direct.exchange") // 参数1: 交换机名称
.durable(true) // 参数2: 是否持久化
.build();
}
/**
* 创建一个队列
* - name: 队列名称,这里是 "queue"
* - durable: 是否持久化,设置为 true,表示队列在RabbitMQ重启后依然存在
* @return 返回创建好的队列
*/
@Bean
public Queue queue2(){
return QueueBuilder
.durable("queue") // 参数1: 队列名称, 参数2: 是否持久化
.build();
}
/**
* 创建一个绑定关系
* - destination: 目标队列名称,这里是 "queue"
* - destinationType: 目标类型,这里是队列
* - exchange: 交换机名称,这里是 "direct.exchange"
* - routingKey: 路由键,这里是 "routingkey"
* @param queue 引用创建的队列
* @param exchange 引用创建的交换机
* @return 返回创建好的绑定关系
*/
@Bean
public Binding binding2(
@Qualifier("queue2") Queue queue, // 使用Qualifier指定注入的Bean名称
@Qualifier("exchange2") Exchange exchange
){
return BindingBuilder
.bind(queue) // 参数1: 队列
.to(exchange) // 参数2: 交换机
.with("routingkey") // 参数3: 路由键
.noargs(); // 参数4: 没有额外参数
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
实际开发推荐使用Builder模式链式创建,链式方法的语义更加清晰
# 4.4 基于注解创建和绑定交换机和队列 (消费者中定义)
- 如果你在已经通过 @RabbitListener 注解的 bindings 属性定义了队列、交换机和绑定关系,当 Spring 容器启动时,RabbitMQ 也会自动创建你所定义的队列和交换机(如果它们不存在的话),并将它们绑定起来。
- 这样,你就不需要在配置文件中再次定义它们,可以直接在测试代码或业务代码中发送消息到指定的交换机,RabbitMQ 会根据你定义的绑定关系将消息路由到相应的队列
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 使用 @RabbitListener 注解定义队列、交换机及它们之间的绑定关系,并在消费者中接收消息
*/
@Component
public class EmailService {
// 使用 @RabbitListener 注解定义该方法为消息监听方法,指定监听的队列和交换机
@RabbitListener(bindings = @QueueBinding(
// 定义队列
// value属性指定队列的配置
// name属性指定队列名称
// autoDelete属性定义队列是否是自动删除,false表示不自动删除
value = @Queue(value = "email.fanout.queue", autoDelete = "false"),
// 定义交换机
// value属性指定交换机名称
// type属性指定交换机类型,这里为广播类型(fanout)
exchange = @Exchange(value = "fanout_order_exchange", type = ExchangeTypes.FANOUT)
))
// @RabbitHandler 注解标识这是一个消息处理方法,用于处理接收到的消息
public void messageReceive(String message) {
// 接收到消息后的处理逻辑,例如在这里可以进行邮件发送等操作
System.out.println("email-------------->" + message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27