RabbitMQ - 死信队列
# 1. 死信队列的概念
在RabbitMQ中,生产者(Producer)将消息发送到交换机(Exchange)。交换机根据设置的路由规则,将消息路由到一个或多个队列(Queue)。消费者(Consumer)从队列中取出消息进行消费。如果因为某些原因(如消息TTL过期、队列满、消息被拒绝等),消息无法被消费,则这些消息会被转移到死信队列(Dead Letter Queue,DLQ)中,以便进一步的处理或分析。
死信队列是用来收集因为特定原因(比如消息TTL过期、队列达到最大长度、消息被显式拒绝且不重新入队)而无法正常消费的消息的特殊队列。
# 2. 死信的来源
死信(Dead Letter)是指在消息队列中无法被正确消费的消息。在RabbitMQ中,这通常发生在以下几种情况:
- 消息TTL过期:TTL(Time To Live,生存时间)是指一个消息在队列中可以存在的最长时间。一旦一个消息的存活时间超过了它的TTL,它就会变成死信。这种机制可以用于实现延时消息的功能,例如,可以设置订单在30分钟内未支付就自动取消。
- 队列达到最大长度:队列中的消息数量达到了队列的最大容纳量。这时,新进入队列的消息会导致队列中的一部分老消息被丢弃或变成死信,这种情况下,死信队列可以用来存储这些无法进入正常队列的消息,以便后续处理。
- 消息被显式拒绝:消费者从队列中获取消息后,可以通过
basic.reject
或basic.nack
命令拒绝消息,并且设置requeue=false
,表示不希望RabbitMQ将这条消息重新放入队列中。这样,消息就会变成死信。
# 3. 应用场景
在电商系统中,可以使用死信队列和消息TTL来实现订单的自动取消功能。用户下单后,生成一个具有一定TTL的支付消息发送到队列中。如果用户在TTL过期前完成支付,支付成功的消息会被消费,订单继续处理。如果用户未在TTL内支付,该消息变成死信,然后被转移到死信队列中,系统通过消费死信队列中的消息来自动取消订单。
在消息频繁被拒绝的情况下,通过死信队列可以分析消息为何被拒绝,是否存在消费者处理逻辑的问题,或是消息本身的问题。
# 4. 死信实战
交换机类型是 direct,两个消费者,一个生产者,两个队列:消息队列和死信队列
# 4.1 消息TTL过期
# 1. 生产者代码
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange"; // 定义交换机名称
public static void main(String[] args) throws IOException, TimeoutException {
// 获取通道
Channel channel = RabbitMQUtils.getChannel();
// 声明一个直接类型的交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 创建消息属性配置,设置消息TTL为10秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
// 循环发送10条消息
for (int i = 1; i <= 10; i++) {
String message = "info" + i; // 消息内容
// 发送消息到交换机,路由键为'zhangsan',带有TTL属性
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println("生产者发送消息:" + message); // 控制台打印发送的消息
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 2. 消费者 C1 代码
- 先启动 消费者 C1 创建队列,启动之后关闭该消费者 模拟其接收不到消息。
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* 消费者C1模拟接收不到消息
*/
public class Consumer01 {
// 普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
// 声明普通交换机和死信交换机,类型都为直接交换机(direct)
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueueName = "dead-queue";
channel.queueDeclare(deadQueueName, false, false, false, null);
// 将死信队列绑定到死信交换机上,并设置路由键为"lisi"
channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");
// 为普通队列设置死信交换机相关参数
Map<String, Object> params = new HashMap<>();
// 指定死信消息转发到的交换机
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 指定死信消息在死信交换机上的路由键
params.put("x-dead-letter-routing-key", "lisi");
// 声明普通队列,并且通过参数`params`将其与死信交换机关联
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
// 将普通队列绑定到普通交换机上,路由键为"zhangsan"
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
// 定义消息的投递回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Consumer01 接收到消息:" + message);
};
// 开始消费普通队列中的消息
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
死信队列如何接收消息?
死信队列本质上和普通队列没有区别,它只是用于存放由于某些特定原因(如消息TTL过期、队列达到最大长度、消息被显式拒绝等)无法被正常消费的消息。普通队列通过设置额外的参数(x-dead-letter-exchange
和x-dead-letter-routing-key
)与死信交换机相关联,这样当普通队列中的消息成为死信时,就会自动转发到指定的死信交换机,再根据路由键路由到绑定的死信队列中。这个机制确保了消息的可追踪性和安全性,允许开发者对无法处理的消息进行后续的处理或分析。
测试:先启动消费者 C1,创建出队列,然后停止该 C1 的运行,则 C1 将无法收到队列的消息,预计无法收到的消息 10 秒后进入死信队列。然后启动生产者(producer )生产消息进行测试。
生产者生产消息完成,10 秒后启动 C2 消费者,它消费死信队列里面的消息,如果消费成功,则代表原本 C1 消费的消息 10 秒进入了死信队列
# 3. 消费者 C2 代码
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 消费者C2专门消费死信队列中的消息。
*/
public class Consumer02 {
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取与RabbitMQ的连接通道
Channel channel = RabbitMQUtils.getChannel();
// 声明死信交换机,类型为直接交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
// 将死信队列绑定到死信交换机,路由键为"lisi"
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信消息........... ");
// 定义消息接收的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 解析消息内容
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
// 打印接收到的死信消息
System.out.println("Consumer02 接收到消息" + message);
};
// 开始消费死信队列中的消息,不需要手动ACK确认
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
效果演示
# 4.2 队列达到最大长度
1. 生产者发送消息:生产者发送消息到普通交换机,不设置消息TTL(生存时间),basicPublish
的第三个参数改为 null。
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
// 声明普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 设置消息的 TTL 时间 10s
// AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
// 循环发送10条消息
for (int i = 1; i <= 10; i++) {
String message = "info" + i;
// 发送消息到普通交换机,路由键为"zhangsan"
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
System.out.println("生产者发送消息:" + message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2. C1消费者设置队列最大长度:在消费者中设置普通队列的最大长度。如果队列中的消息数量超出了这个限制,超出部分的消息将变成死信。
启动之后关闭该消费者,模拟其接收不到消息,让消息堆压在队列中,由于我们给队列设置了最大长度,所以堆压的数量不会超过这个长度。
public class Consumer01 {
// 普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
// 声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueueName = "dead-queue";
channel.queueDeclare(deadQueueName, false, false, false, null);
// 死信队列绑定:队列、交换机、路由键(routingKey)
channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");
// 设置普通队列参数,包括绑定死信交换机和设置队列最大长度
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 死信交换机
params.put("x-dead-letter-routing-key", "lisi"); // 死信路由键
// 设置正常队列长度的限制,例如发送 10 个消息,6 个位正常,4 个则为死信
params.put("x-max-length", 6);
// 声明普通队列,并绑定到普通交换机
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
// 设置消息接收回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Consumer01 接收到消息" + message);
};
// 开始接收普通队列的消息
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
注意
因为参数改变了,所以需要把原先队列删除
2023-12-08 @scholar
3. C2 消费者代码不变(启动 C2 消费者去消费死信队列的消息
)
# 4.3 消息被显示拒绝
- 生产者代码: 生产者发送消息到普通交换机,与前面的生产者代码相同,不再重复。
- 消费者C1拒绝特定消息: 消费者C1接收普通队列中的消息,对特定消息"info5"执行显示拒绝操作,并将其转入死信队列。
1. 消费者 C1 代码
public class Consumer01 {
private static final String NORMAL_EXCHANGE = "normal_exchange"; // 普通交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange"; // 死信交换机名称
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
// 声明普通交换机和死信交换机,类型为直接交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列,并绑定到死信交换机
String deadQueueName = "dead-queue";
channel.queueDeclare(deadQueueName, false, false, false, null);
channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");
// 设置普通队列绑定死信队列的参数
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 指定死信交换机
params.put("x-dead-letter-routing-key", "lisi"); // 指定死信路由键
// 声明普通队列,并绑定到普通交换机
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
// 定义消息接收的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
if (message.equals("info5")) {
System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
// 对"info5"消息执行拒绝操作,requeue设置为false代表不重新入队,该消息会被发送到死信队列
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("Consumer01 接收到消息" + message);
// 对其他消息执行正常的确认操作
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 开启手动应答模式消费消息
channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2. C2 消费者代码不变
/**
* 消费者C2专门消费死信队列中的消息。
*/
public class Consumer02 {
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取与RabbitMQ的连接通道
Channel channel = RabbitMQUtils.getChannel();
// 声明死信交换机,类型为直接交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
// 将死信队列绑定到死信交换机,路由键为"lisi"
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信消息........... ");
// 定义消息接收的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 解析消息内容
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
// 打印接收到的死信消息
System.out.println("Consumer02 接收到消息" + message);
};
// 开始消费死信队列中的消息,不需要手动ACK确认
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
效果演示:先启动消费者 C1 等待 10 秒让其先进行消费,再启动消费者 C2去消费死信队列的消息