RabbitMQ - 应答与发布
# 1. 消息应答
在RabbitMQ中,消息应答(acknowledgement)机制确保消息在传递过程中不会丢失。当RabbitMQ向消费者发送消息后,通常会立即将该消息标为删除。但这种做法在消费者处理长任务并突然宕机时可能导致正在处理的消息丢失,以及所有发送给该消费者的后续消息。
为了解决这个问题,消息应答机制允许消费者在成功处理完消息后显式地向RabbitMQ发送一个“已处理”的信号,此时RabbitMQ才会删除该消息。这样即使在处理消息过程中消费者发生故障,消息也不会丢失,因为它仍然被RabbitMQ保留。
# 自动应答
自动应答模式下,消息被发送给消费者后立即被认为是成功传递的。这种模式在处理高吞吐量时很有用,但它在数据安全性方面存在风险。如果消费者在确认消息之前发生故障(如连接或通道关闭),那么消息会丢失。此外,自动应答模式不限制传递给消费者的消息数量
,可能导致消费者端消息积压,进而耗尽内存资源,最终可能导致消费者进程被操作系统终止。因此,自动应答模式适用于消费者能够高效处理消息且可以接受消息丢失风险的场景。
# 手动消息应答的方法
在RabbitMQ中,手动消息应答机制允许更精细的控制消息的确认过程,确保消息在确实被处理后才从队列中移除。
1. Channel.basicAck - 肯定确认应答
basicAck(long deliveryTag, boolean multiple);
- deliveryTag:该参数是消息的唯一标识,用于指定确认哪个消息。
- multiple:该布尔值指定是否批量确认。若为
true
,则会一次性确认所有直到deliveryTag
的消息;若为false
,则只确认deliveryTag
指定的那个消息。
2. Channel.basicReject - 否定确认应答
basicReject(long deliveryTag, boolean requeue);
- deliveryTag:该参数是消息的唯一标识,用于指定确认哪个消息。
- requeue:指定被拒绝的消息是否重新入队列。若为
true
,消息将被重新排队;若为false
,消息将被丢弃或进入死信队列。 - 与
basicNack
的主要区别在于basicReject
不支持批量处理消息。
3. Channel.basicNack - 否定确认,支持批量
basicNack(long deliveryTag, boolean multiple, boolean requeue);
- deliveryTag:该参数是消息的唯一标识,用于指定确认哪个消息。
- multiple:支持批量否定确认,如果设置为
true
,则一次性拒绝所有未确认的消息直到deliveryTag
;若为false
,则只拒绝指定的消息。 - requeue:指定被拒绝的消息是否重新入队列。若为
true
,消息将被重新排队;若为false
,消息将被丢弃或进入死信队列。 basicNack
提供了basicReject
的功能,并增加了批量处理能力。
4. Channel.basicRecover - 消息恢复
basicRecover(boolean requeue);
- requeue:指定是否将未确认的消息重新入队。
true
表示消息会被重新发送到队列中,并尽可能地由其他消费者接收处理;false
可能导致消息被同一消费者重复消费。
通过手动消息应答,开发者可以在消息消费过程中有更大的控制权,选择合适的时机确认消息处理完成或者拒绝消息。这种机制特别适合处理需要较长时间的任务,或者在消费过程中需要保证消息不丢失的场景。
Multiple 参数解释:
- true:表示批量确认channel 上未应答的消息。举例来说,如果存在未确认的消息标签5, 6, 7, 8,且当前传入的
deliveryTag
为8,则5到8的消息都将被确认。 - false:表示仅确认当前指定的
deliveryTag
对应的消息,其他未确认的消息不受影响。
# 消息自动重新入队
当消费者因为连接中断(如通道关闭、连接关闭或TCP连接丢失)而未能发送ACK确认时,RabbitMQ会察觉到这些消息没有被完全处理,并会自动将它们重新放入队列中。这一机制确保了在消费者异常终止的情况下,消息不会丢失,因为RabbitMQ会尝试将这些消息重新分配给其他可用的消费者。这样,即便在消费者出现故障的情况下,系统也能保证消息的完整性和可靠性。
# 手动应答案例
默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答
下面的案例演示如何实现手动应答机制,其中包括两个消费者——一个处理速度快(每秒处理一条消息),另一个处理速度慢(每10秒处理一条消息)。我们将观察当慢速消费者在处理消息时突然停止运行,正在处理的消息是否会重新进入队列,并最终被快速消费者消费。
消费者1 - 快速消费者
消费者1设置为手动应答,并模拟较短的处理时间(1秒)。这代表了能够迅速处理消息的消费者。
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Worker03 {
// 定义队列名称
private static final String TASK_QUEUE_NAME = "queue2";
public static void main(String[] args) throws Exception {
// 通过工具类获取RabbitMQ的连接通道
Channel channel = RabbitMQUtils.getChannel();
// 打印提示信息,表示消费者已准备好接收消息
System.out.println("first 等待接收消息");
// 定义消息到达的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 从消息体中获取字符串内容
String message = new String(delivery.getBody());
try {
// 模拟处理消息需要1秒钟的时间
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打印接收到的消息
System.out.println("first 接收到消息:" + message);
// 手动发送ACK(消息确认),第二个参数为false表示确认收到单条消息,而非多条
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 定义取消消息时的回调函数
CancelCallback cancelCallback = consumerTag -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
// 设置手动消息确认。autoAck为false时关闭自动应答,开启手动应答
boolean autoAck = false;
// 开始消费消息。传入队列名称、自动应答标志、回调函数
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
消费者2 - 慢速消费者
消费者2与消费者1相似,但是它模拟了较长的处理时间(10秒),代表处理能力较慢的消费者。
// 在 Worker03 的基础上将 Thread.sleep(1000); 改为 Thread.sleep(10000);
效果演示和过程分析
- 生产者向队列发送消息。
- 消费者1和消费者2同时监听同一个队列,准备接收消息。
- 当消费者2正在处理一条消息(需要10秒完成)时,我们模拟它发生故障(例如,通过停止程序)。
- 由于消费者2没有对正在处理的消息发送ACK确认,RabbitMQ检测到这一点,并将该消息重新放回队列中。
- 消费者1(快速消费者)能够接收并处理被重新入队的消息。
通过这种方式,即使在消费者发生故障的情况下,消息也不会丢失,保证了消息处理的可靠性。
# 手动应答效果演示
正常情况下消息生产者发送两个消息, first 和 second 分别接收到消息并进行处理
当发送者发送消息 DD 到队列,此时是 second 来消费该消息,但是由于它处理时间较长,在还未处理完时间里停止运行,也就是说 second 还没有执行到 ack 代码的时候,second 被停掉了,此时会看到消息被 first 接收到了,说明消息 DD 被重新入队,然后分配给能处理消息的 first 处理了
# 2. RabbitMQ持久化
当 RabbitMQ 服务停掉以后,消息生产者发送过来的消息不丢失要如何保障?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
# 队列持久化
之前我们创建的队列都是非持久化的,RabbitMQ 如果重启的化,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable 参数设置为true,代表开启持久化。
在消息生产者开启持久化:
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class Task03 {
private static final String TASK_QUEUE_NAME = "ack_queue"; // 队列名称
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
// 让队列持久化
boolean durable = true;
// 声明队列时,将durable参数设置为true,开启持久化
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息:");
while (sc.hasNext()) {
String message = sc.nextLine();
// 发布消息
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
注意
如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列
2023-12-08 @scholar
不然就会出现如下错误:
以下为控制台中持久化与非持久化队列的 UI 显示区
# 消息持久化
为了确保RabbitMQ中的消息在服务重启后不会丢失,除了需要将队列设置为持久化之外,还需要在消息生产者发布消息时开启消息的持久化。这可以通过在发送消息时,使用MessageProperties.PERSISTENT_TEXT_PLAIN
属性来实现。
在消息生产者中,当调用basicPublish
方法发送消息时,可以通过传入MessageProperties.PERSISTENT_TEXT_PLAIN
作为消息的属性参数,这样RabbitMQ就知道需要将这个消息保存到磁盘上,从而实现消息的持久化。
public class Task03 {
private static final String TASK_QUEUE_NAME = "ack_queue"; // 队列名称
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
// 声明一个持久化队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息:");
while (sc.hasNext()) {
String message = sc.nextLine();
// 发布持久化消息
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
注意事项
- 虽然将消息标记为持久化能够让RabbitMQ尝试将消息保存到磁盘,但这个过程并不是绝对可靠的。当消息被标记为持久化后,RabbitMQ会将消息存储到磁盘,但是存在一个短暂的时间窗口,在这个时间窗口内,消息可能仍然只在内存中,尚未完全写入磁盘。因此,即使使用了消息持久化,也不能保证消息在极端情况下绝对不会丢失。
- 为了进一步提高消息的持久化保证,可以结合使用持久化队列和持久化消息,并确保RabbitMQ的写入磁盘操作已经完成。此外,还可以利用RabbitMQ的事务功能或者发布确认(publisher confirms)机制来增加消息传递的可靠性。
# 3. 不公平分发
# 介绍
在RabbitMQ的消息分发机制中,默认采用的是轮询(Round-Robin)策略,这种策略在多个消费者性能均衡的情况下工作得很好。然而,在实际应用中,消费者处理消息的速度往往是不一样的。例如,消费者1可能迅速处理完任务,而消费者2处理同样的任务却需要更长的时间。如果仍然使用轮询分发策略,就会导致一些消费者长时间处于繁忙状态,而一些消费者则大部分时间处于空闲状态,这种情况下,轮询分发就不再是最优选择。
为了更高效地利用消费者的处理能力,可以采用不公平分发(Unfair Dispatch)的策略。这种策略通过设置预取计数来实现。预取计数是指RabbitMQ将同时给一个消费者发送的消息数。当设置预取计数为1时,RabbitMQ就不会在同一时间向消费者发送多于一个的消息。换句话说,只有当消费者完成了当前的消息处理并发出了ACK之后,RabbitMQ才会向该消费者发送新的消息。
在消费者中消费消息之前,设置参数 channel.basicQos(1)
来实现消息的不公平分发:
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Worker03 {
// 定义队列名称
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
// 通过工具类获取与RabbitMQ的连接通道
Channel channel = RabbitMQUtils.getChannel();
// 控制台打印,表示消费者正在等待接收消息
System.out.println("first 等待接收消息处理时间较短");
// 定义消息接收的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 从消息载体中获取消息内容
String message = new String(delivery.getBody());
try {
// 模拟消息处理时间,此处为1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 控制台打印接收到的消息
System.out.println("接收到消息:" + message);
// 手动发送ACK(消息确认),通知RabbitMQ消息已被处理
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 定义取消消费的回调
CancelCallback cancelCallback = consumerTag -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
// 开启不公平分发模式
int prefetchCount = 1;
channel.basicQos(prefetchCount);
// 关闭自动应答模式,开启手动应答。这是不公平分发策略的关键设置
boolean autoAck = false;
// 开始消费消息
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
开启成功,会看到如下结果:
不公平分发核心思想
- 当某个消费者的工作队列中还有未处理完或未确认(acknowledged)的消息时,RabbitMQ不会向该消费者分配新的消息。相反,RabbitMQ会将新消息分配给那些已经完成当前消息处理或当前空闲的消费者。
- 这意味着处理速度快的消费者能够接收更多的消息,而处理速度慢的消费者不会因为积累太多未处理的消息而过载,从而提高了整体的处理效率和响应速度。
如果所有消费者都忙于处理消息,且新消息不断到来的情况下,可能会出现队列消息堆积的情况。此时,为了避免队列被撑满,可以考虑增加更多的消费者(worker)来分担工作负载,或者改变消息的存储策略来应对这种情况。
# 效果演示
生产者生产多个消息,两个消费者的消费时间不同,则消费消息的次数也不同
# 4. 预取值分发
# 介绍
带权的消息分发
默认消息的发送是异步发送的,所以在任何时候,channel 上不止只有一个消息来自消费者的手动确认,所以本质上是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos
方法设置「预取计数」值来完成的。
该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。
通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。
预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境 中。对于大多数应用来说,稍微高一点的值将是最佳的。
public class worker03 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
// 获取与RabbitMQ服务器的通信通道
Channel channel = RabbitMQUtils.getChannel();
System.out.println("c1 等待接收消息处理时间5s");
// 定义消息接收的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 将接收到的消息体转换为字符串
String message = new String(delivery.getBody());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接收到消息:" + message);
/**
* 1.消息标记 tag
* 2.是否批量应答未应答的消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 定义消费者取消消费的回调函数
CancelCallback cancelCallback = (s) -> {
System.out.println(s + "消费者取消消费接口回调逻辑");
};
// 不公平分发
// int prefetchCount = 1;
// channel.basicQos(prefetchCount);
// 当值不等于 1,则代表预取值
int prefetchCount = 2;
channel.basicQos(prefetchCount);
// 开启手动应答模式,关闭自动应答
boolean autoAck = false;
// 开始消费消息,参数依次是队列名、是否自动应答、消息接收的回调函数和消费者取消消费的回调函数
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
笔记
- 在RabbitMQ中,"持有的未确认消息的数量"指的是消费者可以接收但尚未发送ACK确认的消息的最大数量。这个机制是通过设置预取值来控制的。
- 不公平分发和预取值分发都用到
basicqos
方法,如果取值为 1,代表不公平分发,取值不为1,代表预取值分发。
2023-12-08 @scholar
# 效果演示
# 5. 发布确认
生产者发布消息到 RabbitMQ 后,需要 RabbitMQ 返回「ACK(已收到)」给生产者,这样生产者才知道自己生产的消息成功发布出去。
# 6. 发布确认逻辑
在RabbitMQ中,发布确认逻辑是一种确保消息可靠传递的机制,通过将信道设置为confirm模式,生产者可以得到关于消息是否成功到达目的地的异步通知。
确认模式的工作原理:
启用Confirm模式:生产者在信道上启用confirm模式。一旦进入该模式,信道上所有发布的消息都会被赋予一个唯一的ID(从1开始)。
消息投递:消息被投递到所有匹配的队列后,RabbitMQ会向生产者发送一个确认(ACK)消息,该确认消息包含了消息的唯一ID,从而通知生产者消息已经成功到达目标队列。
持久化确认:消息和队列必须被设置为可持久化,确认消息会在消息
成功写入磁盘后
发出,确保消息的持久存储。批量确认:RabbitMQ的确认消息中的
delivery-tag
字段包含了被确认消息的序列号。此外,RabbitMQ可以通过设置ACK消息的multiple
字段为true
来指示该序列号及之前的所有消息都已被确认。异步确认:发布确认机制是异步的。生产者发送消息后,可以不用等待当前消息的确认就继续发送下一条消息。一旦消息被确认,生产者将通过回调方法得到通知。
处理消息丢失:如果由于RabbitMQ内部错误导致消息丢失,RabbitMQ会发送一个否定确认(NACK)消息给生产者。生产者可以通过在回调方法中处理NACK消息来响应消息丢失事件。
优点:
- 提高吞吐量:由于是异步操作,生产者不需要等待每条消息的确认就能继续发送消息,这样可以显著提高消息发布的速率。
- 可靠性保证:确认机制保证了生产者能够知道每条消息是否成功到达队列,增加了消息传递的可靠性。
- 错误处理:通过回调方法处理ACK和NACK,生产者能够对成功和失败的消息传递做出响应,实现错误处理和消息重发。
使用发布确认逻辑,生产者能够有效地管理消息的生命周期,确保消息的可靠传递,同时保持高效的消息处理性能。
# 7. 发布确认的策略
在RabbitMQ中,发布确认是一种确保消息从生产者安全到达队列的机制。默认情况下,发布确认是关闭的。如果你需要使用这个特性来增加消息传递的可靠性,你必须在通道(Channel)上启用发布确认。
# 开启发布确认的方法
在通道上调用confirmSelect
方法可以开启发布确认。告诉RabbitMQ后续在该通道上发布的所有消息都需要被确认。
//开启发布确认
channel.confirmSelect();
2
# 单个确认发布
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)
这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* 演示单个确认发布模式。
* 这种模式下,每发送一条消息就等待RabbitMQ的确认,确保消息被可靠投递。
*/
public class ConfirmMessage {
// 定义要发送的消息数量
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 通过工具类获取与RabbitMQ的连接通道
Channel channel = RabbitMQUtils.getChannel();
// 随机生成队列名称,并声明一个新的队列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
// 开启发布确认模式
channel.confirmSelect();
// 记录开始时间,用于计算整个发布过程的耗时
long begin = System.currentTimeMillis();
// 循环发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
// 发布消息到随机生成的队列中
channel.basicPublish("", queueName, null, message.getBytes());
// 等待服务端的确认响应,如果服务端在指定时间内未确认,可以考虑消息重发
boolean flag = channel.waitForConfirms();
if (flag) {
// 当消息得到确认时,打印消息发送成功的信息
System.out.println("消息发送成功");
}
}
// 记录结束时间
long end = System.currentTimeMillis();
// 打印发送完所有消息所耗费的总时间
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
注意确认发布指的是成功发送到了队列,并不是消费者消费了消息。
# 批量确认发布
批量确认发布是RabbitMQ中用于提高消息发布吞吐量的一种策略,与单个确认发布使用的API一样的,关键的区别在于它们确认消息的时机和方式不同。
- 单个确认发布:每发送一条消息后,就立即等待RabbitMQ的确认(ACK)。这意味着生产者在每条消息发送并被确认之前,都会暂停发送下一条消息,直到收到当前消息的确认。这种方式虽然可以确保每条消息都被准确地跟踪和确认,但会因为频繁等待确认而导致整体吞吐量较低。
- 批量确认发布:生产者先连续发送一批消息,直到达到预设的批量大小后,再一次性等待这批消息的确认(ACK)。这样做的好处是显著减少了等待确认的次数,从而提高了消息发布的吞吐量。生产者不需要在每条消息发送后都暂停等待确认,而是在发送了多条消息后才暂停等待这批消息的确认。
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* 实现RabbitMQ的批量确认发布机制。
* 与单个确认发布相比,批量确认可以显著提高消息发布的吞吐量。
*/
public class ConfirmMessage2 {
// 定义要发送的消息总数
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 从工具类中获取与RabbitMQ的连接通道
Channel channel = RabbitMQUtils.getChannel();
// 随机生成一个队列名,并声明一个队列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
// 开启发布确认模式
channel.confirmSelect();
// 记录开始时间
long begin = System.currentTimeMillis();
// 设置每批次确认消息的大小
int batchSize = 100;
// 初始化未确认消息的计数器
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
// 构建消息
String message = i + "";
// 发布消息到队列
channel.basicPublish("", queueName, null, message.getBytes());
// 未确认的消息数量加1
outstandingMessageCount++;
// 当发布的消息数量达到批量大小时,进行一次批量确认,接收到mq给予明确的确认(ACK)后再继续执行
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0; // 重置未确认消息计数器
}
}
// 最后确认是否还有未确认的消息,确保所有消息都被确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
// 记录结束时间,并计算发送消息的总耗时
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
优缺点分析
优点:
- 提高吞吐量:批量确认发布极大地提高了消息发布的吞吐量,特别适用于需要高速处理大量消息的场景。
- 效率高:减少了网络通信的次数,生产者可以更快地发送下一批消息。
缺点:
- 故障定位困难:如果在一批消息中有消息因为故障未能被RabbitMQ确认,难以定位具体是哪一条消息出了问题。因此,需要在内存中记录每批消息的详细信息,以便出现问题时能重新发布。
- 仍是同步操作:尽管提高了吞吐量,但批量确认发布仍然是一种同步操作,生产者在等待确认时会阻塞后续消息的发送。
应用场景
批量确认发布方式适合于对消息传递速度要求高,而对单条消息确认精确度要求相对较低的应用场景。在这种模式下,虽然可能需要额外处理消息重发的逻辑,但整体上能显著提高生产者的消息处理能力,尤其是在处理大量消息的情况下。开发者需要根据实际应用场景的特点和需求,权衡使用单个确认还是批量确认的发布方式。
# 异步确认发布
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高。它通过注册回调函数来异步地处理消息确认,这意味着生产者在发送消息后无需阻塞等待每条消息的确认,从而可以同时继续进行消息发送和处理确认或否定确认的回调,大大提高了消息发布的吞吐量。下面详细讲解异步确认是怎么实现的。
添加回调函数,在回调函数里进行确认发布
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* 异步确认发布示例
* 利用回调函数来确认消息是否成功传递到队列,提高消息传递的性能和可靠性。
*/
public class ConfirmMessage3 {
public static final int MESSAGE_COUNT = 1000; // 定义要发送的消息总数
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel(); // 获取通道
String queueName = UUID.randomUUID().toString(); // 随机生成队列名称
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 开启发布确认模式
channel.confirmSelect();
// 定义消息确认(ACK)的回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
System.out.println("确认的消息序列号:" + deliveryTag); // 打印被确认的消息序列号
};
// 定义消息未确认(NACK)的回调
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("未确认的消息序列号:" + deliveryTag); // 打印未确认的消息序列号
};
// 添加异步确认的监听器,分别指定了确认和未确认的回调函数
channel.addConfirmListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis(); // 记录开始时间
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i; // 构造消息内容
channel.basicPublish("", queueName, null, message.getBytes()); // 发布消息
}
long end = System.currentTimeMillis(); // 记录结束时间
// 打印发送消息的耗时
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
/**
* 演示异步确认发布的实现方式,适合高吞吐量的消息发送场景。
*/
public class ConfirmMessage3 {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws IOException, TimeoutException {
// 获取通道
Channel channel = RabbitMQUtils.getChannel();
// 声明队列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
// 开启发布确认模式
channel.confirmSelect();
// 使用ConcurrentSkipListMap来存储所有已发送但未确认的消息,确保线程安全和高效的操作
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// 定义消息确认(ACK)的回调处理
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
// 这里deliveryTag是消息发布确认成功的序列号
if (multiple) {
// 通过deliveryTag序列号从map里面取出所有发布确认成功的消息并删除,剩下的就是发布确认失败的消息
// 如果multiple为true,表示批量确认,那么就删除所有小于等于deliveryTag的消息
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
confirmed.clear();
} else {
// 如果multiple为false,只确认当前的deliveryTag
outstandingConfirms.remove(deliveryTag);
}
};
// 定义未确认(NACK)的回调处理
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
// 这里deliveryTag是消息发布失败成功的序列号,获取未确认消息的内容
String message = outstandingConfirms.get(deliveryTag);
System.out.println("发布的消息" + message + "未被确认,序列号:" + deliveryTag);
};
/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
// 在发送消息之前,先记录下该消息的序列号和内容
// channel.getNextPublishSeqNo()预先获取这条消息将要被分配的序列号
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
// 发送消息
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Make sure to add code blocks to your code group
异步确认的过程
- 当调用
channel.confirmSelect()
开启发布确认模式,并通过channel.basicPublish
发送消息后,RabbitMQ会自动处理这些消息。 - 异步确认是通过
channel.addConfirmListener(ackCallback, nackCallback);
实现的。该监听器注册了两个回调函数——ackCallback
和nackCallback
,分别用于处理消息确认(ACK)和未确认(NACK)的情况。这两个回调函数是异步执行的,即它们在RabbitMQ确认消息后自动触发,而不需要阻塞或等待。
如何处理异步未确认消息?
- 存储机制:使用一个线程安全的队列,如
ConcurrentLinkedQueue
,来存储未确认的消息。这个队列可以从发布线程中添加消息,并在确认回调中移除发布确认成功的消息。 - 消息跟踪:当消息被发布时,将其信息加入到这个队列中。这样做的目的是为了能够追踪哪些消息已发送但还未得到RabbitMQ的确认。
- 确认处理:在消息的确认回调中,通过检索和移除队列中对应的消息来处理确认。如果是批量确认的情况,可能需要一次性移除多个消息。
- 未确认消息的处理:如果收到了未确认的回调(即NACK),则需要从队列中找到对应的消息并进行处理,处理方式可能包括重试发送、记录日志或触发警报等。
# 以上 3 种发布确认速度对比
单独发布消息
同步等待确认,简单,但吞吐量非常有限。
批量发布消息
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
异步处理
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
# 8. 应答和发布区别
应答功能属于消费者,消费完消息告诉 RabbitMQ 已经消费成功。
发布功能属于生产者,生产消息到 RabbitMQ,RabbitMQ 需要告诉生产者已经收到消息。