程序员scholar 程序员scholar
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
npm

(进入注册为作者充电)

  • 中间件 - RabbitMQ

    • 消息队列 - 介绍
    • RabbitMQ - 介绍
    • RabbitMQ - 安装
    • RabbitMQ - 基础案例
    • RabbitMQ - 应答与发布
    • RabbitMQ - 交换机
    • RabbitMQ - 死信队列
    • RabbitMQ - 延迟队列
      • 1. 延迟队列介绍
      • 2. TTL的两种设置
      • 3. 整合SpringBoot
      • 3. 队列TTL
        • 3.1 RabbitMQ配置类
        • 3.2 Controller 层代码
        • 3.3 消费者代码
      • 4. 延时队列TTL优化
        • 4.1 RabbitMQ配置类
        • 4.2 Controller 新增方法
      • 5. Rabbitmq插件实现延迟队列
      • 6. 插件实战
        • 6.1 配置类代码
        • 6.2 生产者代码
        • 6.3 消费者代码
      • 7. 总结
    • RabbitMQ - 高级发布确认
    • RabbitMQ - 优先级
    • SpringBoot整合RabbitMQ
  • 消息数据库
  • 中间件 - RabbitMQ
scholar
2023-11-10
目录

RabbitMQ - 延迟队列

  • 1. 延迟队列介绍
  • 2. TTL的两种设置
  • 3. 整合SpringBoot
  • 3. 队列TTL
    • 3.1 RabbitMQ配置类
    • 3.2 Controller 层代码
    • 3.3 消费者代码
  • 4. 延时队列TTL优化
    • 4.1 RabbitMQ配置类
    • 4.2 Controller 新增方法
  • 5. Rabbitmq插件实现延迟队列
  • 6. 插件实战
    • 6.1 配置类代码
    • 6.2 生产者代码
    • 6.3 消费者代码
  • 7. 总结

# 1. 延迟队列介绍

延迟队列概念:

延迟队列是一种特殊的队列,它的核心特性是队列中的元素能在指定的延迟时间后被消费,而不是立即消费。这种队列允许元素在被实际处理之前等待一段指定的时间。简单来说,延迟队列用于存放那些需要在将来某个特定时刻才执行的任务。

延迟队列使用场景:

  1. 订单自动取消:在线购物平台中,用户下单后,若在一定时间内未支付,系统自动取消订单。

  2. 提醒功能:如新用户注册后若几天内未登录,则自动发送提醒邮件或短信。

  3. 预约提醒:用户预约会议或活动,系统在会议开始前自动发送提醒通知。

  4. 自动处理:用户请求退款,若在一定时间内未得到处理,则自动通知运营人员跟进。

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;那我们一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于「如果账单一周内未支付则进行自动结算」这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:「订单十分钟内未支付则关闭」,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

下载

# 2. TTL的两种设置

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。

换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为「死信」。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

2.1 队列设置 TTL

队列级别的TTL通过在创建队列时指定x-message-ttl参数来实现,此参数定义了队列中所有消息的存活时间。

// 创建队列时设置TTL属性
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000); // 设置队列中消息的TTL为5000毫秒(5秒)

// 使用QueueBuilder构建队列并指定TTL参数
Queue queue = QueueBuilder.durable("QA") // 队列名为"QA"
                          .withArguments(args) // 添加TTL设置
                          .build(); // 构建队列实例
1
2
3
4
5
6
7
8
  • QueueBuilder.durable("QA"): 创建一个名为"QA"的持久化队列。
  • .withArguments(args): 将参数args(包含TTL设置)应用于队列。
  • args.put("x-message-ttl", 5000): 在队列参数中设置消息的TTL值为5000毫秒(5秒),表示队列中的消息如果5秒内未被消费,则会自动过期。

2.2 消息设置 TTL

针对单条消息设置TTL可以在发送消息时通过MessageProperties来实现,允许对每条消息设置不同的TTL值。

// 发送消息时设置TTL
rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor -> {
    messagePostProcessor.getMessageProperties().setExpiration("5000"); // 设置该消息的TTL为5000毫秒(5秒)
    return messagePostProcessor;
});
1
2
3
4
5
  • convertAndSend: 发送消息到交换机X,路由键为XC。
  • messagePostProcessor -> {...}: 是一个回调函数,用于在消息发送前对消息进行处理。
  • setExpiration("5000"): 在消息属性中设置该消息的TTL值为5000毫秒(5秒),意味着这条消息如果在5秒内未被消费,则会过期。

两者区别

  • 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息的过期判断发生在消息即将被消费之前。这意味着,如果队列头部的消息未过期,后续即使有已过期的消息,也不会立即被删除。因此,在队列积压严重时,过期消息可能会比预期存活更长时间。

  • 另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

# 3. 整合SpringBoot

前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

下面是如何通过整合SpringBoot和RabbitMQ来实现延时队列的步骤:

  1. 创建SpringBoot工程:首先,需要创建一个Maven工程或者Spring Boot工程。

  2. 添加依赖:在pom.xml文件中添加Spring Boot与RabbitMQ的相关依赖。其中,spring-boot-starter-amqp是必须的,因为它包含了Spring AMQP和RabbitMQ的整合支持。

    <!-- 核心Spring Boot依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- Spring Web依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 测试依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <!-- JSON处理依赖 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    <!-- Lombok便捷注解 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!-- RabbitMQ 依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
  3. 配置RabbitMQ连接:在application.yml文件中配置RabbitMQ的连接信息,包括服务器地址、端口、用户名和密码。

    server:
      port: 8808
    spring:
      rabbitmq:
        host: 112.74.169.231  # RabbitMQ服务器地址
        port: 5672            # 端口号
        username: admin       # 用户名
        password: 123456      # 密码
    
    1
    2
    3
    4
    5
    6
    7
    8

    端口号8808是Spring Boot应用的端口,根据实际情况进行配置。

  4. 创建启动类:新建启动类RabbitmqApplication,启动Spring Boot应用。可以通过注入RabbitTemplate来发送和接收消息。

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class RabbitmqApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RabbitmqApplication.class, args);
        }
        
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

通过上述步骤,你就可以开始使用SpringBoot与RabbitMQ来实现延时队列等功能了。后续的消息发送和接收可以通过在Spring Boot中注入和使用RabbitTemplate实例来完成。

# 3. 队列TTL

代码架构图

创建两个队列 QA 和 QB,两个队列的 TTL 分别设置为 10S 和 40S,然后再创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

image-20211110225716769

原先配置队列信息,写在了生产者和消费者代码中,现在可写在配置类中,生产者只发消息,消费者只接受消息

# 3.1 RabbitMQ配置类

通过配置类的方式来定义队列(Queue)、交换机(Exchange)以及它们之间的绑定关系(Binding)。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {
    // 定义交换机、队列和路由键的名称常量
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_QA = "QA";
    public static final String QUEUE_QB = "QB";
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String DEAD_LETTER_QUEUE = "QD";

    /**
     * 声明xExchange交换机
     * @return Direct类型的交换机X
     */
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    /**
     * 声明yExchange死信交换机
     * @return Direct类型的死信交换机Y
     */
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明队列QA,设置TTL为10秒,并绑定到死信交换机Y
     * @return 配置了TTL属性的队列QA
     */
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // 设置死信交换机
        params.put("x-dead-letter-routing-key", "YD"); // 设置死信路由键
        params.put("x-message-ttl", 10000); // 设置消息TTL为10秒
        return QueueBuilder.durable(QUEUE_QA).withArguments(params).build();
    }

    /**
     * 绑定队列QA到交换机X
     * @return 绑定对象
     */
    @Bean
    public Binding queueBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    /**
     * 声明队列QB,设置TTL为40秒,并绑定到死信交换机Y
     * @return 配置了TTL属性的队列QB
     */
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // 同上
        params.put("x-dead-letter-routing-key", "YD"); // 同上
        params.put("x-message-ttl", 40000); // 设置消息TTL为40秒
        return QueueBuilder.durable(QUEUE_QB).withArguments(params).build();
    }

    /**
     * 绑定队列QB到交换机X
     * @return 绑定对象
     */
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    
    /**
     * 声明死信队列QD
     * @return 死信队列QD
     */
    @Bean("queueD")
    public Queue queueD() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    /**
     * 绑定死信队列QD到死信交换机Y
     * @return 绑定对象
     */
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

# 3.2 Controller 层代码

在Controller 层使用RabbitTemplate来发送消息,通过指定的交换机和路由键将消息正确路由到目标队列。通过不同的路由键,相同的消息可以发送到具有不同TTL(Time-To-Live)的队列中。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.util.Date;
import lombok.extern.slf4j.Slf4j;

/**
 * 演示如何通过Spring Boot应用的Controller层向RabbitMQ发送消息。
 */
@RestController
@RequestMapping("/ttl") 
@Slf4j 
public class SendMsgController {
    
    @Autowired // 自动注入RabbitTemplate
    private RabbitTemplate rabbitTemplate;

    /**
     * 通过路径变量接收消息内容,并将该消息发送到两个不同TTL设置的队列中。
     * @param message 消息内容,通过URL路径传入
     */
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable("message") String message) {
        // 记录日志,显示当前时间和要发送的消息
        log.info("当前时间:{}, 发送一条信息给两个TTL队列: {}", new Date(), message);
        // 发送消息到交换机X,路由键为XA,消息内容附加说明来自10S TTL队列
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10S的队列: " + message);
        // 发送消息到交换机X,路由键为XB,消息内容附加说明来自40S TTL队列
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40S的队列: " + message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 3.3 消费者代码

监听死信队列是否出现消息,并在收到消息时将其内容打印到控制台。这里使用了@RabbitListener注解来标记一个方法作为消息监听器,当指定的队列(死信队列“QD”)中有消息到达时,该方法会自动被调用。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.util.Date;
import lombok.extern.slf4j.Slf4j;

/**
 * 死信队列消息消费者。
 */
@Slf4j 
@Component 
public class DeadLetterQueueConsumer {
    /**
     * 使用@RabbitListener注解监听死信队列QD,当队列中有消息时自动调用此方法。
     * @param message 接收到的消息,Spring AMQP Message对象,包含了消息的详细信息。
     */
    @RabbitListener(queues = "QD") // 监听死信队列QD
    public void receiveD(Message message) {
        String msg = new String(message.getBody()); // 从Message对象获取消息内容
        // 记录日志,显示当前时间和收到的死信队列消息内容
        log.info("当前时间:{}, 收到死信队列信息: {}", new Date(), msg);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

发起一个请求:http://localhost:8808/ttl/sendMsg/我是可乐 (opens new window)

image-20211111002950970

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

番外

当然也可以自定义 RabbitMQ 的配置信息

点击查看代码
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * RabbitMQ配置类,用于自定义RabbitMQ的连接信息和RabbitTemplate配置。
 */
@Configuration // 标识为配置类
public class RabbitConfig {

    // 从application.yml配置文件中读取RabbitMQ的主机地址
    @Value("${spring.rabbitmq.host}")
    private String host;

    // 从application.yml配置文件中读取RabbitMQ的端口
    @Value("${spring.rabbitmq.port}")
    private int port;

    // 从application.yml配置文件中读取RabbitMQ的用户名
    @Value("${spring.rabbitmq.username}")
    private String username;

    // 从application.yml配置文件中读取RabbitMQ的密码
    @Value("${spring.rabbitmq.password}")
    private String password;

    /**
     * 定义创建连接工厂的Bean。
     */
    @Bean("connectionFactory")
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host); // 设置RabbitMQ服务器地址
        connectionFactory.setPort(port); // 设置RabbitMQ服务器端口
        connectionFactory.setUsername(username); // 设置连接RabbitMQ服务器的用户名
        connectionFactory.setPassword(password); // 设置连接RabbitMQ服务器的密码
        connectionFactory.setVirtualHost("/"); // 设置RabbitMQ的虚拟主机,默认为"/"
        return connectionFactory;
    }

    /**
     * 定义RabbitTemplate的Bean,用于发送和接收消息。
     * 使用SCOPE_PROTOTYPE,表示每次注入RabbitTemplate时都会创建一个新的实例。
     */
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory); // 使用自定义的连接工厂创建RabbitTemplate
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

# 4. 延时队列TTL优化

在这里新增了一个队列 QC,该队列不设置 TTL 时间,根据前端的请求确定 TTL 时间,绑定关系如下:

image-20211111000716623

# 4.1 RabbitMQ配置类

新增一个配置文件类,用于新增队列 QC,也可以放在上方的配置文件类里。该队列不预设TTL时间,而是根据消息本身的TTL或者在消息发送时动态设置TTL时间。这样做的好处是可以更灵活地控制消息的延时处理时间。队列QC通过绑定关系绑定到交换机X上,同时,队列QC配置了死信交换机Y,当消息过期或者被拒绝时,会自动转发到死信队列。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 配置类用于创建延时队列QC和设置其死信交换机。
 */
@Configuration
public class MsgTtlQueueConfig {
    // 死信交换机名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    // 新增的队列名称
    public static final String QUEUE_C = "QC";

    /**
     * 创建队列C,并配置死信交换机。
     * 该队列没有设置TTL,TTL将在消息发送时动态指定。
     */
    @Bean("queueC")
    public Queue queueC() {
        Map<String, Object> args = new HashMap<>();
        // 绑定死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 指定死信路由键
        args.put("x-dead-letter-routing-key", "YD");
        // 不设置TTL属性
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    /**
     * 将队列C绑定到交换机X上。
     * 使用路由键"XC"进行绑定。
     */
    @Bean
    public Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 4.2 Controller 新增方法

该方法接收的请求要带有 TTL 时间





























 
 
 
 
 
 
 
 
 


/**
 * Controller层提供消息发送接口。
 */
@RestController
@Slf4j 
@RequestMapping("/ttl") 
public class SendMsgController {
    
    @Autowired
    private RabbitTemplate rabbitTemplate; // 自动注入RabbitTemplate来操作RabbitMQ

    /**
     * 发送消息到TTL为10秒和40秒的队列。
     * @param message 消息内容
     */
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable("message") String message) {
        log.info("当前时间:{}, 发送一条信息给两个TTL队列:{}", new Date(), message);
        // 发送消息到队列XA和XB,这两个队列的TTL分别配置为10秒和40秒
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10S的队列: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40S的队列: " + message);
    }

    /**
     * 发送带有TTL的消息到队列C。
     * @param message 消息内容
     * @param ttlTime 消息的TTL时间(毫秒)
     */
    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime) {
        // 动态设置消息的TTL,并发送到队列XC
        rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
            correlationData.getMessageProperties().setExpiration(ttlTime); // 设置消息的TTL
            return correlationData;
        });
        log.info("当前时间:{}, 发送一条时长{}毫秒TTL信息给队列C:{}", new Date(), ttlTime, message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

重启下面,发送请求:

http://localhost:8808/ttl/sendExpirationMsg/你好1/20000 (opens new window)

http://localhost:8808/ttl/sendExpirationMsg/你好2/2000 (opens new window)

image-20211111002753539

总结

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时"死亡"

因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

这也就是为什么如图的时间:你好 2 延时 2 秒,却后执行,还要等待你好 1 消费后再执行你好 2。

# 5. Rabbitmq插件实现延迟队列

上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。

安装延时队列插件

可去官网下载 (opens new window)找到 rabbitmq_delayed_message_exchange 插件,放置到 RabbitMQ 的插件目录。

因为官网也是跳转去该插件的 GitHub 地址进行下载:点击跳转 (opens new window)

打开 Linux,用 Xftp 将插件放到 RabbitMQ 的安装目录下的 plgins 目录,

RabbitMQ 与其 plgins 目录默认分别位于

# RabbitMQ 安装目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.8   
# RabbitMQ 的 plgins 所在目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.8/plugins
1
2
3
4

其中我的mq版本是 /rabbitmq_server-3.9.8,进入目录后执行下面命令让该插件生效,然后重启 RabbitMQ

# 安装并启用延迟队列插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启RabbitMQ服务
systemctl restart rabbitmq-server
1
2
3
4

如果是docker安装,mq的插件目录在容器内/opt/rabbitmq/plugins/下

# 复制宿主机内的mq插件到容器内的mq插件目录里面即可
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez a77c987b645b:/opt/rabbitmq/plugins/
# 进入容器内容启动插件,然后重启容器即可
docker exec -it 容器id bash
cd /opt/rabbitmq/plugins/
# 安装并启用延迟队列插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1
2
3
4
5
6
7

解释

  • 安装命令不能出现插件版本和后缀,如 rabbitmq-plugins enable rabbitmq_delayed_message_exchange-3.9.0.ez 会报错

  • 必须是 rabbitmq-plugins enable rabbitmq_delayed_message_exchange,后面不允许填入版本和文件后缀

打开 Web 界面,查看交换机的新增功能列表,如果多出了如图所示,代表成功添加插件

image-20211111180530451

# 6. 插件实战

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

image-20211111180629701

# 6.1 配置类代码

新增一个配置类 DelayedQueueConfig,也可以放在原来的配置文件里,代码里使用了 CustomExchange 类,通过参数来自定义一个类型(direct、topic等)

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

@Configuration
public class DelayedQueueConfig {
    // 定义延迟队列的名称
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    // 定义延迟交换机的名称
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    // 定义路由键
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    /**
     * 创建延迟队列。
     * @return 创建的延迟队列实例。
     */
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    /**
     * 自定义延迟交换机。
     * 使用 CustomExchange 类型来创建,这是因为我们需要一个支持延迟消息的交换机。
     * @return 创建的自定义延迟交换机实例。
     */
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        // 设置延迟交换机的类型为 direct
        args.put("x-delayed-type", "direct");
        // 创建一个类型为 "x-delayed-message" 的交换机,这个类型是插件提供的,专门用于处理延迟消息
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    /**
     * 将延迟队列绑定到延迟交换机。
     * @param queue 延迟队列
     * @param delayedExchange 自定义的延迟交换机
     * @return 创建的绑定关系实例。
     */
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        // 绑定延迟队列与延迟交换机,并设置使用的路由键
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 6.2 生产者代码

在 controller 里新增一个方法,使用RabbitMQ插件实现延迟消息的生产者代码

@RestController
@Slf4j
@RequestMapping("/ttl")
public class SendMsgController {
    
    @Autowired
    private RabbitTemplate rabbitTemplate; // 注入RabbitTemplate来操作RabbitMQ

    // 延迟交换机的名称和路由键常量
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    
    // ...省略前面定义的方法

    /**
     * 发送带有延迟时间的消息到延迟队列。
     * @param message 消息内容
     * @param delayTime 延迟时间(毫秒)
     */
    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable("delayTime") Integer delayTime) {
        // 发送消息
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
                correlationData -> {
                    // 设置消息的延迟时间
                    correlationData.getMessageProperties().setDelay(delayTime);
                    return correlationData; // 返回配置后的消息
                });
        log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
    }
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 6.3 消费者代码

使用@RabbitListener注解来监听RabbitMQ中的延迟队列,当延迟队列中有消息到达时,自动触发方法处理消息。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 延迟队列消费者类。
 */
@Slf4j
@Component
public class DelayQueueConsumer {

    // 延迟队列名称常量
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";

    /**
     * 监听延迟队列。
     * @param message 接收到的消息体
     */
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message.getBody()); // 将消息体转换为字符串
        log.info("当前时间:{}, 收到延时队列的消息:{}", new Date().toString(), msg); // 记录日志
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

发送请求:

http://localhost:8808/ttl/sendDelayMsg/hello1/20000 (opens new window)

http://localhost:8808/ttl/sendDelayMsg/hello2/2000 (opens new window)

image-20211111181554596

可以看到哪怕 hello1 需要20秒再进入延时队列,hello2 2 秒后直接进入延时队列,无需等待 hello1

# 7. 总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

编辑此页 (opens new window)
上次更新: 2024/12/28, 18:32:08
RabbitMQ - 死信队列
RabbitMQ - 高级发布确认

← RabbitMQ - 死信队列 RabbitMQ - 高级发布确认→

Theme by Vdoing | Copyright © 2019-2025 程序员scholar
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式