程序员scholar 程序员scholar
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
npm

(进入注册为作者充电)

  • 中间件 - RabbitMQ

    • 消息队列 - 介绍
    • RabbitMQ - 介绍
    • RabbitMQ - 安装
    • RabbitMQ - 基础案例
    • RabbitMQ - 应答与发布
    • RabbitMQ - 交换机
    • RabbitMQ - 死信队列
      • 1. 死信队列的概念
      • 2. 死信的来源
      • 3. 应用场景
      • 4. 死信实战
        • 4.1 消息TTL过期
        • 1. 生产者代码
        • 2. 消费者 C1 代码
        • 3. 消费者 C2 代码
        • 4.2 队列达到最大长度
        • 4.3 消息被显示拒绝
    • RabbitMQ - 延迟队列
    • RabbitMQ - 高级发布确认
    • RabbitMQ - 优先级
    • SpringBoot整合RabbitMQ
  • 消息数据库
  • 中间件 - RabbitMQ
scholar
2023-11-10
目录

RabbitMQ - 死信队列

  • 1. 死信队列的概念
  • 2. 死信的来源
  • 3. 应用场景
  • 4. 死信实战
    • 4.1 消息TTL过期
    • 4.2 队列达到最大长度
    • 4.3 消息被显示拒绝

# 1. 死信队列的概念

在RabbitMQ中,生产者(Producer)将消息发送到交换机(Exchange)。交换机根据设置的路由规则,将消息路由到一个或多个队列(Queue)。消费者(Consumer)从队列中取出消息进行消费。如果因为某些原因(如消息TTL过期、队列满、消息被拒绝等),消息无法被消费,则这些消息会被转移到死信队列(Dead Letter Queue,DLQ)中,以便进一步的处理或分析。

死信队列是用来收集因为特定原因(比如消息TTL过期、队列达到最大长度、消息被显式拒绝且不重新入队)而无法正常消费的消息的特殊队列。

# 2. 死信的来源

死信(Dead Letter)是指在消息队列中无法被正确消费的消息。在RabbitMQ中,这通常发生在以下几种情况:

  1. 消息TTL过期:TTL(Time To Live,生存时间)是指一个消息在队列中可以存在的最长时间。一旦一个消息的存活时间超过了它的TTL,它就会变成死信。这种机制可以用于实现延时消息的功能,例如,可以设置订单在30分钟内未支付就自动取消。
  2. 队列达到最大长度:队列中的消息数量达到了队列的最大容纳量。这时,新进入队列的消息会导致队列中的一部分老消息被丢弃或变成死信,这种情况下,死信队列可以用来存储这些无法进入正常队列的消息,以便后续处理。
  3. 消息被显式拒绝:消费者从队列中获取消息后,可以通过basic.reject或basic.nack命令拒绝消息,并且设置requeue=false,表示不希望RabbitMQ将这条消息重新放入队列中。这样,消息就会变成死信。

# 3. 应用场景

在电商系统中,可以使用死信队列和消息TTL来实现订单的自动取消功能。用户下单后,生成一个具有一定TTL的支付消息发送到队列中。如果用户在TTL过期前完成支付,支付成功的消息会被消费,订单继续处理。如果用户未在TTL内支付,该消息变成死信,然后被转移到死信队列中,系统通过消费死信队列中的消息来自动取消订单。

在消息频繁被拒绝的情况下,通过死信队列可以分析消息为何被拒绝,是否存在消费者处理逻辑的问题,或是消息本身的问题。

# 4. 死信实战

交换机类型是 direct,两个消费者,一个生产者,两个队列:消息队列和死信队列

image-20211110190646829

# 4.1 消息TTL过期

# 1. 生产者代码

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange"; // 定义交换机名称

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取通道
        Channel channel = RabbitMQUtils.getChannel();
        // 声明一个直接类型的交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 创建消息属性配置,设置消息TTL为10秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        
        // 循环发送10条消息
        for (int i = 1; i <= 10; i++) {
            String message = "info" + i; // 消息内容
            // 发送消息到交换机,路由键为'zhangsan',带有TTL属性
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
            System.out.println("生产者发送消息:" + message); // 控制台打印发送的消息
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 2. 消费者 C1 代码

  • 先启动 消费者 C1 创建队列,启动之后关闭该消费者 模拟其接收不到消息。
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * 消费者C1模拟接收不到消息
 */
public class Consumer01 {
    // 普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        
        // 声明普通交换机和死信交换机,类型都为直接交换机(direct)
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        
        // 声明死信队列
        String deadQueueName = "dead-queue";
        channel.queueDeclare(deadQueueName, false, false, false, null);
        // 将死信队列绑定到死信交换机上,并设置路由键为"lisi"
        channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");

        // 为普通队列设置死信交换机相关参数
        Map<String, Object> params = new HashMap<>();
        // 指定死信消息转发到的交换机
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 指定死信消息在死信交换机上的路由键
        params.put("x-dead-letter-routing-key", "lisi");
        
        // 声明普通队列,并且通过参数`params`将其与死信交换机关联
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        // 将普通队列绑定到普通交换机上,路由键为"zhangsan"
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息........... ");

        // 定义消息的投递回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("Consumer01 接收到消息:" + message);
        };
        // 开始消费普通队列中的消息
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

死信队列如何接收消息?

死信队列本质上和普通队列没有区别,它只是用于存放由于某些特定原因(如消息TTL过期、队列达到最大长度、消息被显式拒绝等)无法被正常消费的消息。普通队列通过设置额外的参数(x-dead-letter-exchange和x-dead-letter-routing-key)与死信交换机相关联,这样当普通队列中的消息成为死信时,就会自动转发到指定的死信交换机,再根据路由键路由到绑定的死信队列中。这个机制确保了消息的可追踪性和安全性,允许开发者对无法处理的消息进行后续的处理或分析。

测试:先启动消费者 C1,创建出队列,然后停止该 C1 的运行,则 C1 将无法收到队列的消息,预计无法收到的消息 10 秒后进入死信队列。然后启动生产者(producer )生产消息进行测试。

image-20211110192243321

生产者生产消息完成,10 秒后启动 C2 消费者,它消费死信队列里面的消息,如果消费成功,则代表原本 C1 消费的消息 10 秒进入了死信队列

# 3. 消费者 C2 代码

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 消费者C2专门消费死信队列中的消息。
 */
public class Consumer02 {
    // 死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取与RabbitMQ的连接通道
        Channel channel = RabbitMQUtils.getChannel();
        
        // 声明死信交换机,类型为直接交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        
        // 声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        
        // 将死信队列绑定到死信交换机,路由键为"lisi"
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        
        // 定义消息接收的回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 解析消息内容
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 打印接收到的死信消息
            System.out.println("Consumer02 接收到消息" + message);
        };
        
        // 开始消费死信队列中的消息,不需要手动ACK确认
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

效果演示

image-20211110192743276

image-20240326042035178

# 4.2 队列达到最大长度

1. 生产者发送消息:生产者发送消息到普通交换机,不设置消息TTL(生存时间),basicPublish 的第三个参数改为 null。








 
 





 





public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        // 声明普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 设置消息的 TTL 时间 10s
        // AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

       // 循环发送10条消息
        for (int i = 1; i <= 10; i++) {
            String message = "info" + i;
            // 发送消息到普通交换机,路由键为"zhangsan"
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

2. C1消费者设置队列最大长度:在消费者中设置普通队列的最大长度。如果队列中的消息数量超出了这个限制,超出部分的消息将变成死信。

启动之后关闭该消费者,模拟其接收不到消息,让消息堆压在队列中,由于我们给队列设置了最大长度,所以堆压的数量不会超过这个长度。
























 
 

















public class Consumer01 {
    // 普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明死信队列
        String deadQueueName = "dead-queue";
        channel.queueDeclare(deadQueueName, false, false, false, null);
        // 死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");

        // 设置普通队列参数,包括绑定死信交换机和设置队列最大长度
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 死信交换机
        params.put("x-dead-letter-routing-key", "lisi");  // 死信路由键
        // 设置正常队列长度的限制,例如发送 10 个消息,6 个位正常,4 个则为死信
        params.put("x-max-length", 6);

        // 声明普通队列,并绑定到普通交换机
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息........... ");
        // 设置消息接收回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("Consumer01 接收到消息" + message);
        };
        // 开始接收普通队列的消息
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

注意

因为参数改变了,所以需要把原先队列删除

2023-12-08 @scholar

3. C2 消费者代码不变(启动 C2 消费者去消费死信队列的消息)

image-20211110193547802

# 4.3 消息被显示拒绝

  1. 生产者代码: 生产者发送消息到普通交换机,与前面的生产者代码相同,不再重复。
  2. 消费者C1拒绝特定消息: 消费者C1接收普通队列中的消息,对特定消息"info5"执行显示拒绝操作,并将其转入死信队列。

1. 消费者 C1 代码































 
 
 
 
 
 
 
 
 






public class Consumer01 {
    private static final String NORMAL_EXCHANGE = "normal_exchange"; // 普通交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange"; // 死信交换机名称

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        
        // 声明普通交换机和死信交换机,类型为直接交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        
        // 声明死信队列,并绑定到死信交换机
        String deadQueueName = "dead-queue";
        channel.queueDeclare(deadQueueName, false, false, false, null);
        channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");

        // 设置普通队列绑定死信队列的参数
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 指定死信交换机
        params.put("x-dead-letter-routing-key", "lisi"); // 指定死信路由键
        
        // 声明普通队列,并绑定到普通交换机
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息........... ");

        // 定义消息接收的回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            if (message.equals("info5")) {
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                // 对"info5"消息执行拒绝操作,requeue设置为false代表不重新入队,该消息会被发送到死信队列
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 接收到消息" + message);
                // 对其他消息执行正常的确认操作
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 开启手动应答模式消费消息
        channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

image-20211110194202134

2. C2 消费者代码不变

/**
 * 消费者C2专门消费死信队列中的消息。
 */
public class Consumer02 {
    // 死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取与RabbitMQ的连接通道
        Channel channel = RabbitMQUtils.getChannel();
        
        // 声明死信交换机,类型为直接交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        
        // 声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        
        // 将死信队列绑定到死信交换机,路由键为"lisi"
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        
        // 定义消息接收的回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 解析消息内容
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 打印接收到的死信消息
            System.out.println("Consumer02 接收到消息" + message);
        };
        
        // 开始消费死信队列中的消息,不需要手动ACK确认
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

效果演示:先启动消费者 C1 等待 10 秒让其先进行消费,再启动消费者 C2去消费死信队列的消息

image-20240326145035456

编辑此页 (opens new window)
上次更新: 2024/12/28, 18:32:08
RabbitMQ - 交换机
RabbitMQ - 延迟队列

← RabbitMQ - 交换机 RabbitMQ - 延迟队列→

Theme by Vdoing | Copyright © 2019-2025 程序员scholar
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式