RabbitMQ - 优先级
# 1. 幂等性
# 概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。
可以理解为验证码,只能输入一次,再次重新输入会刷新验证码,原来的验证码失效。
# 消息重复消费问题
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
# 消息重复消费解决思路
MQ 消费者的幂等性的解决一般使用全局 ID
或者写个唯一标识
比如 时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断
,或者按自己的规则生成一个全局唯一 id
,每次消费消息时用该 id 先判断该消息是否已消费过。
# 消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。
业界主流的幂等性有两种操作:
# 1. 唯一ID + 指纹码机制
核心思想:结合业务生成一个全局唯一的标识(指纹码),并利用数据库的主键约束或唯一索引来保证消息不被重复处理。
实现步骤:
- 生成指纹码:根据业务特点,将消息ID、时间戳、业务数据等信息拼接成一个唯一的指纹码。
- 数据库去重:在处理消息前,先查询数据库判断指纹码是否存在。
- 如果存在,说明消息已处理,直接忽略;
- 如果不存在,将指纹码记录到数据库,并执行业务逻辑。
唯一ID + 指纹码机制优缺点:
- 优点:实现简单,依赖数据库的能力进行去重。
- 缺点:在高并发场景下,数据库可能成为性能瓶颈。可通过分库分表、读写分离等策略优化。
# 2. Redis的原子性操作
核心思想:利用Redis提供的原子操作SETNX
(Set if Not eXists)来实现幂等性。SETNX
会先检查键是否存在,只有当键不存在时,才会设置键值。
实现步骤:
- 使用SETNX:在处理消息之前,使用
SETNX
命令以指纹码作为键,设置一个值(如"1"或当前时间戳)。 - 检查结果:
- 如果命令返回1,表示键被成功设置,说明消息未被处理,继续执行业务逻辑;
- 如果命令返回0,表示键已存在,说明消息已被处理,直接忽略。
Redis的原子性操作优缺点:
- 优点:利用Redis的高性能和原子性特性,适合高并发场景。
- 缺点:需要外部存储(Redis)支持,增加了系统的复杂性和外部依赖。
# 2. 优先级队列
# 使用场景
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧。
但是,天猫商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。
# 添加方法
Web 页面添加
防止图片失效,这里记录步骤
- 进入 Web 页面,点击 Queue 菜单,然后点击
Add a new queue
- 点击下方的
Maximum priority
- 执行第二步,则会自动在
Argument
生成x-max-priority
字符串 - 点击
Add queue
即可添加优先级队列成功
声明队列的时候添加优先级:设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap();
// 优先级为 10
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
2
3
4
注意事项
队列实现优先级需要做的事情有如下:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序
# 优先级队列实战
生产者发送十个消息,如果消息为 info5
,则优先级是最高的,当消费者从队列获取消息的时候,优先获取 info5
消息
整合Spring实现
1. RabbitMQ 工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 连接 RabbitMQ 的工具类
*/
public class RabbitMQUtils {
public static Channel getChannel() throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置RabbitMQ服务器地址
connectionFactory.setHost("127.0.0.1");
// 设置用户名
connectionFactory.setUsername("admin");
// 设置密码
connectionFactory.setPassword("123456");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信道并返回
return connection.createChannel();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2. 生产者代码
- 定义优先级队列:在发送消息之前,确保队列已经被声明为一个带有优先级设置的队列。
- 设置消息属性:对于需要优先处理的消息,设置其
priority
属性值更高。 - 发送消息:使用
basicPublish
方法发送消息到队列。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
/**
* 优先级队列生产者示例。
*/
public class PriorityProducer {
private static final String QUEUE_NAME = "priority_queue";
public static void main(String[] args) throws Exception{
// 获取RabbitMQ连接中的通道
Channel channel = RabbitMQUtils.getChannel();
// 设置具有高优先级的消息属性
AMQP.BasicProperties highPriorityProps = new AMQP.BasicProperties.Builder().priority(10).build();
// 发送10条消息,其中info5消息优先级最高
for (int i = 1; i <= 10; i++) {
String message = "info" + i;
if (i == 5) {
// 对于info5消息,使用带有高优先级的属性发送
channel.basicPublish("", QUEUE_NAME, highPriorityProps, message.getBytes());
} else {
// 其他消息正常发送,不设置优先级属性
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("发送消息完成:" + message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
3. 消费者代码
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;
import java.util.Map;
/**
* 优先级队列的消费者。
*/
public class PriorityConsumer {
// 定义队列名称
private final static String QUEUE_NAME = "priority_queue";
public static void main(String[] args) throws Exception {
// 获取与RabbitMQ服务器的连接通道
Channel channel = RabbitMQUtils.getChannel();
// 设置队列的最大优先级,官方推荐范围是1-10,过高的优先级设置会影响性能
Map<String, Object> params = new HashMap<>();
params.put("x-max-priority", 10); // 设置队列的最大优先级为10
// 声明队列并指定其优先级参数
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
// 消息消费时的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("消费的消息:" + message);
};
// 消费被取消时的回调,例如队列被删除
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
// 启动消费者消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
效果演示
info 5 的优先级为 10,优先级最高。消费者消费信息效果如图:
# 3. 惰性队列
惰性队列(Lazy Queues)是RabbitMQ中一个重要的特性,它主要针对需要处理大量积压消息的场景,通过优化消息存储方式来提高队列容量,降低内存的使用率。自RabbitMQ 3.6.0版本引入后,惰性队列成为了处理大规模消息堆积情况的一个有效工具。
# 使用场景
惰性队列的使用场景主要集中在以下几个方面:
- 长时间消息积压:当消费者因故障、下线或维护等原因无法及时消费消息,导致消息在队列中长时间堆积。
- 高容量消息存储:对于需要存储大量消息的应用,使用惰性队列可以有效减少内存占用,增加系统的稳定性。
- 提高系统稳定性:减少消息在内存中的占用,降低系统因内存溢出而崩溃的风险。
# 工作原理
惰性队列与默认的队列行为有所不同:
- 默认行为:消息被尽可能保留在内存中,以便快速地交付给消费者。持久化消息会被写入磁盘,但同时保留一份内存副本。当需要释放内存时,RabbitMQ会将内存中的消息换页到磁盘,这一过程可能导致队列操作暂时阻塞,影响性能。
- 惰性队列行为:消息在到达队列时直接被写入磁盘,仅当消费者需要时才加载到内存中。这种策略大大减少了内存的使用,但可能增加了消息从生产者到消费者的延迟。
# 两种模式
RabbitMQ的队列可以运行在两种模式下:default
(默认模式)和lazy
(惰性模式)。这两种模式影响消息是如何存储和从队列中检索的。从RabbitMQ 3.6.0版本开始,引入了惰性队列概念,允许更灵活地控制消息的存储方式,特别是对于需要支持大量消息积压的应用场景。
默认模式(Default Mode)
在default
模式下,队列尽可能地将消息保存在内存中,这样可以快速地将消息传递给消费者。即使消息已持久化到磁盘,队列也会尽量保留内存中的副本。这种模式下,当内存使用达到阈值时,RabbitMQ会开始将消息"换页"到磁盘上,这可能会暂时阻塞队列的操作。
惰性模式(Lazy Mode)
lazy
模式下,消息在到达队列时立即写入磁盘,仅在必要时才从磁盘加载到内存中。这种策略大大降低了内存的使用量,使得队列可以存储更多的消息,但可能会增加消息从生产者到消费者的传递延迟。
设置队列模式
队列的模式可以在声明队列时通过x-queue-mode
参数进行设置,或者通过策略(Policy)来指定。如果队列同时通过这两种方式被设置,策略设置将具有更高的优先级。
通过队列声明设置:在声明队列时,可以直接在参数中指定x-queue-mode
为lazy
来创建惰性队列。
Map<String, Object> args = new HashMap<>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
2
3
通过策略设置:也可以通过RabbitMQ管理界面或命令行工具设置策略,对匹配的队列应用惰性模式。
在 Web 页面添加队列时,选择 Lazy mode
# 内存开销对比
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB
注意现有队列模式无法修改
要改变已存在队列的模式,必须先删除现有的队列,然后重新声明。这是因为一旦队列被创建,其模式就无法直接修改。删除并重新声明队列会导致队列中的所有消息丢失,因此在执行这一操作前需要谨慎考虑。