程序员scholar 程序员scholar
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Python 基础

    • Python基础
  • Python 进阶

    • 装饰器与生成器
    • 异常处理
    • 标准库精讲
    • 模块与包
    • pip包管理工具
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
前端 (opens new window)
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Python 基础

    • Python基础
  • Python 进阶

    • 装饰器与生成器
    • 异常处理
    • 标准库精讲
    • 模块与包
    • pip包管理工具
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
前端 (opens new window)
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
npm

(进入注册为作者充电)

  • 中间件 - RabbitMQ

    • 消息队列 - 介绍
    • RabbitMQ - 介绍
    • RabbitMQ - 安装
    • RabbitMQ - 基础案例
    • RabbitMQ - 应答与发布
    • RabbitMQ - 交换机
    • RabbitMQ - 死信队列
    • RabbitMQ - 延迟队列
    • RabbitMQ - 高级发布确认
    • RabbitMQ - 优先级
      • 1. 幂等性
        • 概念
        • 消息重复消费问题
        • 消息重复消费解决思路
        • 消费端的幂等性保障
        • 1. 唯一ID + 指纹码机制
        • 2. Redis的原子性操作
      • 2. 优先级队列
        • 使用场景
        • 添加方法
        • 优先级队列实战
      • 3. 惰性队列
        • 使用场景
        • 工作原理
        • 两种模式
        • 内存开销对比
    • SpringBoot整合RabbitMQ
  • 消息数据库
  • 中间件 - RabbitMQ
scholar
2023-11-12
目录

RabbitMQ - 优先级

  • 1. 幂等性
    • 概念
    • 消息重复消费问题
    • 消息重复消费解决思路
    • 消费端的幂等性保障
  • 2. 优先级队列
    • 使用场景
    • 添加方法
    • 优先级队列实战
  • 3. 惰性队列
    • 使用场景
    • 工作原理
    • 两种模式
    • 内存开销对比

# 1. 幂等性

# 概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。

举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。

可以理解为验证码,只能输入一次,再次重新输入会刷新验证码,原来的验证码失效。

# 消息重复消费问题

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

# 消息重复消费解决思路

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如 时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。

# 消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。

业界主流的幂等性有两种操作:

# 1. 唯一ID + 指纹码机制

核心思想:结合业务生成一个全局唯一的标识(指纹码),并利用数据库的主键约束或唯一索引来保证消息不被重复处理。

实现步骤:

  • 生成指纹码:根据业务特点,将消息ID、时间戳、业务数据等信息拼接成一个唯一的指纹码。
  • 数据库去重:在处理消息前,先查询数据库判断指纹码是否存在。
    • 如果存在,说明消息已处理,直接忽略;
    • 如果不存在,将指纹码记录到数据库,并执行业务逻辑。

唯一ID + 指纹码机制优缺点:

  • 优点:实现简单,依赖数据库的能力进行去重。
  • 缺点:在高并发场景下,数据库可能成为性能瓶颈。可通过分库分表、读写分离等策略优化。

# 2. Redis的原子性操作

核心思想:利用Redis提供的原子操作SETNX(Set if Not eXists)来实现幂等性。SETNX会先检查键是否存在,只有当键不存在时,才会设置键值。

实现步骤:

  • 使用SETNX:在处理消息之前,使用SETNX命令以指纹码作为键,设置一个值(如"1"或当前时间戳)。
  • 检查结果:
    • 如果命令返回1,表示键被成功设置,说明消息未被处理,继续执行业务逻辑;
    • 如果命令返回0,表示键已存在,说明消息已被处理,直接忽略。

Redis的原子性操作优缺点:

  • 优点:利用Redis的高性能和原子性特性,适合高并发场景。
  • 缺点:需要外部存储(Redis)支持,增加了系统的复杂性和外部依赖。

# 2. 优先级队列

# 使用场景

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧。

但是,天猫商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。

# 添加方法

  • Web 页面添加

    image-20211112222646574

防止图片失效,这里记录步骤
  1. 进入 Web 页面,点击 Queue 菜单,然后点击 Add a new queue
  2. 点击下方的 Maximum priority
  3. 执行第二步,则会自动在 Argument 生成 x-max-priority 字符串
  4. 点击 Add queue 即可添加优先级队列成功

声明队列的时候添加优先级:设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU

Map<String, Object> params = new HashMap();
// 优先级为 10
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
1
2
3
4

注意事项

队列实现优先级需要做的事情有如下:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序

# 优先级队列实战

生产者发送十个消息,如果消息为 info5,则优先级是最高的,当消费者从队列获取消息的时候,优先获取 info5 消息

整合Spring实现

1. RabbitMQ 工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 连接 RabbitMQ 的工具类
 */
public class RabbitMQUtils {
    
    public static Channel getChannel() throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置RabbitMQ服务器地址
        connectionFactory.setHost("127.0.0.1");
        // 设置用户名
        connectionFactory.setUsername("admin");
        // 设置密码
        connectionFactory.setPassword("123456");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信道并返回
        return connection.createChannel();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

2. 生产者代码

  1. 定义优先级队列:在发送消息之前,确保队列已经被声明为一个带有优先级设置的队列。
  2. 设置消息属性:对于需要优先处理的消息,设置其priority属性值更高。
  3. 发送消息:使用basicPublish方法发送消息到队列。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

/**
 * 优先级队列生产者示例。
 */
public class PriorityProducer {

    private static final String QUEUE_NAME = "priority_queue";

    public static void main(String[] args) throws Exception{
        // 获取RabbitMQ连接中的通道
        Channel channel = RabbitMQUtils.getChannel();
        
        // 设置具有高优先级的消息属性
        AMQP.BasicProperties highPriorityProps = new AMQP.BasicProperties.Builder().priority(10).build();
        
        // 发送10条消息,其中info5消息优先级最高
        for (int i = 1; i <= 10; i++) {
            String message = "info" + i;
            if (i == 5) {
                // 对于info5消息,使用带有高优先级的属性发送
                channel.basicPublish("", QUEUE_NAME, highPriorityProps, message.getBytes());
            } else {
                // 其他消息正常发送,不设置优先级属性
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            }
            System.out.println("发送消息完成:" + message);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

3. 消费者代码

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 优先级队列的消费者。
 */
public class PriorityConsumer {

    // 定义队列名称
    private final static String QUEUE_NAME = "priority_queue";

    public static void main(String[] args) throws Exception {
        // 获取与RabbitMQ服务器的连接通道
        Channel channel = RabbitMQUtils.getChannel();

        // 设置队列的最大优先级,官方推荐范围是1-10,过高的优先级设置会影响性能
        Map<String, Object> params = new HashMap<>();
        params.put("x-max-priority", 10); // 设置队列的最大优先级为10
        // 声明队列并指定其优先级参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, params);

        // 消息消费时的回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消费的消息:" + message);
        };
        
        // 消费被取消时的回调,例如队列被删除
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };

        // 启动消费者消费消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

效果演示

info 5 的优先级为 10,优先级最高。消费者消费信息效果如图:

image-20240328044444185

# 3. 惰性队列

惰性队列(Lazy Queues)是RabbitMQ中一个重要的特性,它主要针对需要处理大量积压消息的场景,通过优化消息存储方式来提高队列容量,降低内存的使用率。自RabbitMQ 3.6.0版本引入后,惰性队列成为了处理大规模消息堆积情况的一个有效工具。

# 使用场景

惰性队列的使用场景主要集中在以下几个方面:

  1. 长时间消息积压:当消费者因故障、下线或维护等原因无法及时消费消息,导致消息在队列中长时间堆积。
  2. 高容量消息存储:对于需要存储大量消息的应用,使用惰性队列可以有效减少内存占用,增加系统的稳定性。
  3. 提高系统稳定性:减少消息在内存中的占用,降低系统因内存溢出而崩溃的风险。

# 工作原理

惰性队列与默认的队列行为有所不同:

  • 默认行为:消息被尽可能保留在内存中,以便快速地交付给消费者。持久化消息会被写入磁盘,但同时保留一份内存副本。当需要释放内存时,RabbitMQ会将内存中的消息换页到磁盘,这一过程可能导致队列操作暂时阻塞,影响性能。
  • 惰性队列行为:消息在到达队列时直接被写入磁盘,仅当消费者需要时才加载到内存中。这种策略大大减少了内存的使用,但可能增加了消息从生产者到消费者的延迟。

# 两种模式

RabbitMQ的队列可以运行在两种模式下:default(默认模式)和lazy(惰性模式)。这两种模式影响消息是如何存储和从队列中检索的。从RabbitMQ 3.6.0版本开始,引入了惰性队列概念,允许更灵活地控制消息的存储方式,特别是对于需要支持大量消息积压的应用场景。

默认模式(Default Mode)

在default模式下,队列尽可能地将消息保存在内存中,这样可以快速地将消息传递给消费者。即使消息已持久化到磁盘,队列也会尽量保留内存中的副本。这种模式下,当内存使用达到阈值时,RabbitMQ会开始将消息"换页"到磁盘上,这可能会暂时阻塞队列的操作。

惰性模式(Lazy Mode)

lazy模式下,消息在到达队列时立即写入磁盘,仅在必要时才从磁盘加载到内存中。这种策略大大降低了内存的使用量,使得队列可以存储更多的消息,但可能会增加消息从生产者到消费者的传递延迟。

设置队列模式

队列的模式可以在声明队列时通过x-queue-mode参数进行设置,或者通过策略(Policy)来指定。如果队列同时通过这两种方式被设置,策略设置将具有更高的优先级。

通过队列声明设置:在声明队列时,可以直接在参数中指定x-queue-mode为lazy来创建惰性队列。

Map<String, Object> args = new HashMap<>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
1
2
3

通过策略设置:也可以通过RabbitMQ管理界面或命令行工具设置策略,对匹配的队列应用惰性模式。

在 Web 页面添加队列时,选择 Lazy mode

image-20211112224619178

# 内存开销对比

image-20211112224647270

在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB

注意现有队列模式无法修改

要改变已存在队列的模式,必须先删除现有的队列,然后重新声明。这是因为一旦队列被创建,其模式就无法直接修改。删除并重新声明队列会导致队列中的所有消息丢失,因此在执行这一操作前需要谨慎考虑。

编辑此页 (opens new window)
上次更新: 2024/12/28, 18:32:08
RabbitMQ - 高级发布确认
SpringBoot整合RabbitMQ

← RabbitMQ - 高级发布确认 SpringBoot整合RabbitMQ→

Theme by Vdoing | Copyright © 2019-2025 程序员scholar
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式