RabbitMQ - 基础案例
# 1. Hello RabbitMQ
用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者
在下图中,“ P” 是我们的生产者,“ C” 是我们的消费者。中间的框是一个队列 RabbitMQ 代表使用者保留的消息缓冲区
笔记
Java 进行连接的时候,需要 Linux 开放 5672 端口,否则会连接超时
访问 Web 界面的端口是 15672,连接服务器的端口是 5672
2023-12-08 @scholar
步骤图:
# 1.1 添加依赖
先创建好 Maven 工程,pom.xml 添入依赖(版本根据需求选择):
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<!--指定 jdk 编译版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 1.2 消息生产者
在RabbitMQ中,消息生产者的主要任务是创建消息并发送到队列中。下面是一个典型的生产者实现步骤:
- 创建连接工厂:首先需要创建一个
ConnectionFactory
的实例,用于配置RabbitMQ Server的连接信息,包括主机地址、端口、用户名和密码。 - 配置RabbitMQ工厂信息:通过
ConnectionFactory
设置RabbitMQ服务端的地址、端口以及访问该服务器所需的用户名和密码。 - 创建连接:利用配置好的
ConnectionFactory
创建与RabbitMQ的TCP连接。 - 创建信道:通过已建立的连接创建一个信道(Channel)。信道是执行大多数RabbitMQ操作的地方,比如声明队列和发送消息。
- 声明队列:在信道上声明一个队列,如果RabbitMQ服务器上不存在指定名称的队列,将会创建它。
- 发送消息:将消息发布到指定的队列中。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
// 定义队列名称
public static final String QUEUE_NAME = "queue1";
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2. 配置RabbitMQ连接信息
connectionFactory.setHost("192.168.199.27"); // 设置RabbitMQ服务地址
connectionFactory.setPort(5672); // 设置RabbitMQ服务端口
connectionFactory.setUsername("admin"); // 设置访问RabbitMQ的用户名
connectionFactory.setPassword("123456"); // 设置访问密码
// 3. 创建连接
Connection connection = connectionFactory.newConnection();
// 4. 创建信道
Channel channel = connection.createChannel();
// 5. 声明(创建)队列
// 数依次是队列名、是否持久化、是否独占队列、是否自动删除和其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 准备消息内容
String message = "Hello RabbitMQ";
// 6. 发送消息到队列
// 参数依次是交换机名、队列名、消息的其他属性、消息内容的字节数组
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 打印消息已发送的提示信息
System.out.println("消息发送完毕");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 1. 声明队列
声明队列是在RabbitMQ中创建队列的过程,如果队列不存在,RabbitMQ会创建一个默认的队列。声明队列的方法如下:
channel.queueDeclare(队列名, 持久化, 共享消费, 自动删除, 配置参数);
- 队列名(String):这是队列的唯一标识符。
- 持久化:如果设置为
true
,则队列会在RabbitMQ重启后保留。消息持久化可以保证消息不会因为RabbitMQ的重启而丢失。 - 共享消费:如果设置为
true
,则允许多个消费者从同一个队列中获取消息。这对于负载均衡和提高消息处理率很有帮助。 - 自动删除:如果设置为
true
,当最后一个消费者断开连接后队列会自动删除。 - 配置参数(Map):用于设置队列的其他属性,如死信队列、队列优先级等。
# 2. 配置参数示例
Map<String, Object> params = new HashMap<>();
params.put("x-max-priority", 10); // 设置队列的最大优先级
params.put("x-dead-letter-exchange", "dead_letter_exchange"); // 声明当前队列绑定的死信交换机
params.put("x-dead-letter-routing-key", "dead_routing_key"); // 声明当前队列的死信路由key
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
2
3
4
5
# 3. 发布消息
将消息发布到队列的操作:
channel.basicPublish(交换机名, 路由键, 配置参数, 消息体);
- 交换机名:指定消息要发送到的交换机。如果是空字符串
""
,则表示使用RabbitMQ的默认交换机。 - 路由键:用于交换机将消息路由到指定的队列。
- 配置参数:设置消息的其他属性,如消息优先级、消息标识符等。
- 消息体:发送的消息内容,通常为字节数组。
发布消息的配置参数示例
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() // 配置参数
.priority(10) // 设置消息的优先级
.messageId("1") // 设置消息的ID
.build();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
2
3
4
5
通过上述方法和配置,生产者可以灵活地控制消息的发送、队列的创建和消息的属性,实现复杂的消息分发和处理逻辑。
# 1.3 消息消费者
在RabbitMQ中,消息消费者的任务是接收并处理生产者发送到队列中的消息。
- 创建连接工厂:实例化一个
ConnectionFactory
对象,用于配置连接到RabbitMQ服务器的参数。 - 配置连接工厂:设置RabbitMQ服务器的地址、用户名和密码。
- 创建连接:使用配置好的
ConnectionFactory
建立与RabbitMQ服务器的连接。 - 创建信道:通过建立的连接开启一个信道,用于监听队列中的消息。
- 设置消费回调:定义如何处理接收到的消息。
- 启动消费:通过
basicConsume
方法启动消息消费,指定队列名称、是否自动确认消息、消费者回调和取消消费的回调。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 队列名称
public static final String QUEUE_NAME = "queue1";
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2. 设置RabbitMQ服务地址和认证信息
connectionFactory.setHost("192.168.199.27");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 3. 创建连接
Connection connection = connectionFactory.newConnection();
// 4. 创建信道
Channel channel = connection.createChannel();
System.out.println("等待接收消息......");
// 5. 设置消息接收回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("接收到的消息:" + message);
};
// 6. 设置取消消费的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
// 7. 开始消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
basicConsume
方法是消费者用来监听队列消息并进行处理的关键方法。该方法订阅一个队列,以异步的方式接收发送到该队列的消息。
channel.basicConsume(队列名字/String, 是否自动签收/boolean, 消费时的回调/接口类, 无法消费的回调/接口类);
- 队列名字(String):这是你想要监听的队列名称,确保此队列已在RabbitMQ中声明。
- 是否自动签收(boolean):这个参数决定了消息的确认模式。如果设置为
true
,RabbitMQ会在消息被成功传递给消费者后立即认为该消息已经成功消费,自动进行消息确认。如果设置为false
,则需要消费者在处理完消息后显式调用确认方法,这允许更细粒度的控制消息处理流程,比如实现消息的重试机制。 - 消费时的回调(DeliverCallback接口):这是一个接口回调,当消费者接收到消息时,RabbitMQ客户端库将调用这个回调函数。你需要实现
DeliverCallback
接口,处理接收到的消息。这个回调函数包含了消息的所有详细信息,包括消息体。 - 无法消费的回调(CancelCallback接口):这是另一个接口回调,当消费者由于某些原因无法继续消费消息时(如队列被删除),RabbitMQ客户端库将调用这个回调函数。需要实现
CancelCallback
接口来处理这种取消消费的情况。
# 2. Work Queues
工作队列(Work Queues),在RabbitMQ中,是一种用于分布式任务处理的模式。主要目的是为了避免即时执行一些资源密集型任务,从而导致必须等待任务完成的情况。相反,任务被封装为消息,发送到队列中,然后由一个或多个在后台运行的工作进程(工作队列)来处理这些任务。这种模式可以显著提高应用程序的处理效率和响应速度。
工作队列的核心思想
- 任务延迟执行:将即时执行的任务转换为可以稍后处理的消息,通过异步处理方式来提高系统的整体效率和响应性。
- 分布式任务处理:多个工作进程可以分布在不同的服务器或容器上,共同参与任务的处理,实现负载均衡和高可用性。
- 消息封装:任务以消息的形式存在,这些消息被发送到队列中,由工作进程从队列中取出并执行。
# 2.1 轮询消费机制
在RabbitMQ中,默认情况下,当有多个消费者(工作队列)监听同一个队列时,RabbitMQ会按照轮询(Round-Robin)的方式分发消息。也就是说,队列中的每个消息会按顺序被发送给下一个消费者,直到所有消费者都接收到了消息,然后再从第一个消费者开始重复这个过程。这种方式确保了消息分发的公平性,每个消费者都有机会平等地处理消息。
# 2.2 轮询案例
在RabbitMQ中实现轮询消费的案例中,我们会创建两个工作队列,这两个队列会从同一个RabbitMQ队列中获取并消费消息。为了实现这个案例,我们首先需要封装RabbitMQ的配置参数为一个工具类RabbitMQUtils
,然后创建两个消费者(工作队列),它们会根据RabbitMQ的默认行为,轮流消费队列中的消息。
# 1. 封装RabbitMQ配置的工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 连接 RabbitMQ 的工具类
*/
public class RabbitMQUtils {
public static Channel getChannel() throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置RabbitMQ服务器地址
connectionFactory.setHost("192.168.199.27");
// 设置用户名
connectionFactory.setUsername("admin");
// 设置密码
connectionFactory.setPassword("123456");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信道并返回
return connection.createChannel();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 2. 创建工作队列消费者
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作线程,相当于消费者
*/
public class Worker01 {
public static final String QUEUE_NAME = "queue2";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
// 声明(创建)队列
// 数依次是队列名、是否持久化、是否独占队列、是否自动删除和其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息接收的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" + receivedMessage);
};
// 消息被取消时的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + " 消费者取消消费接口回调逻辑");
};
System.out.println("first 消费者启动等待消费.................. ");
// 开始消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
步骤总结
- 创建
RabbitMQUtils
工具类:封装了创建与RabbitMQ连接及信道的代码,使得在多个地方可以重复使用这段逻辑,简化了代码的编写。 - 实现工作队列消费者:通过
RabbitMQUtils.getChannel()
获取信道,设置消费者接收消息和取消消费的回调函数。通过channel.basicConsume
开始监听指定队列中的消息。 - 轮询消费机制:启动两个或多个此消费者的实例(如
Worker01
),它们会自动按照RabbitMQ的默认轮询机制平均分配队列中的消息。 - 消费消息:消费者通过回调函数接收和处理消息,然后简单地打印了接收到的消息内容。
创建好一个工作队列后,只需要以多线程方式启动两次该 main 函数即可,以 first、second 区别消息队列。
要开启多线程功能,首先启动该消息队列,然后如图开启多线程:选中 Allow multiple instances
。
两个工作队列都启动后
# 3. 创建生产者发送消息
/**
* 生产者类,用于发送消息到RabbitMQ队列。
*/
public class Task01 {
// 定义要发送消息的队列名称
public static final String QUEUE_NAME = "queue2";
public static void main(String[] args) throws Exception {
// 获取RabbitMQ通道,使用工具类RabbitMQUtils来简化连接和通道的创建过程
Channel channel = RabbitMQUtils.getChannel();
// 创建Scanner对象,用于从命令行读取输入的消息
Scanner scanner = new Scanner(System.in);
// 使用hasNext()方法检查是否有输入,如果有,循环会继续
while (scanner.hasNext()) {
// 读取一行输入作为消息
String message = scanner.next();
// 使用basicPublish方法发送消息到RabbitMQ
// 参数说明:交换机名,队列名称,配置参数,消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 在控制台打印发送完成的消息
System.out.println("消息发送完成:" + message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
结果演示:通过程序执行发现生产者总共发送 4 个消息,消费者 first 和消费者 second 分别分得两个消息,并且是按照有序的一个接收一次消息
# 3. RabbitMQ Web页面基本使用
# 3.1 队列页面
队列是RabbitMQ的基本建筑块之一,用于存储消息直到它们被消费。在Web管理界面的Queues
部分,你可以查看所有队列的列表,创建新队列,以及管理现有队列。
英文名称 | 中文名称 | 描述 |
---|---|---|
Name | 名称 | 队列的名称。 |
Type | 类型 | 队列的类型,比如classic 。 |
Features | 特性 | 队列的特性,如是否持久化(Durable)、是否排他(Exclusive)、是否自动删除(Auto delete)。 |
State | 状态 | 队列的当前状态,如idle 表示空闲。 |
Ready | 就绪消息数 | 准备就绪等待消费的消息数量。 |
Unacked | 未确认消息数 | 已经被消费但还没有被确认的消息数量。 |
Total | 总消息数 | 队列中的消息总数(包括就绪和未确认的消息)。 |
incoming | 消息流入率 | 新消息流入队列的速率。 |
deliver / get | 消息投递率 | 消息从队列投递给消费者的速率。 |
ack | 确认率 | 消息被确认的速率。 |
队列类型
Classic Queues
是RabbitMQ的传统队列类型,提供广泛的特性支持,如消息持久化和优先级,适用于多种通用场景。它们可以配置为镜像队列以提高高可用性,但可能会因数据同步影响性能。Quorum Queues
基于Raft协议设计,自RabbitMQ 3.8版本引入,旨在提供更高的数据一致性和性能,特别是在处理网络分区和节点故障时。它们默认在多个节点之间复制数据,提高了消息的持久性和队列的高可用性,但不支持某些Classic队列的特性。
# 3.2 交换机页面
交换机负责接收生产者发送的消息并根据规则路由到一个或多个队列。在Exchanges
部分,你可以创建新的交换机,查看现有交换机,以及管理它们的配置。
英文名称 | 中文名称 | 描述 |
---|---|---|
Name | 名称 | 交换机的名称。 |
Type | 类型 | 交换机的类型,如direct , topic , fanout , headers 。 |
Features | 特性 | 交换机的特性,主要是是否持久化(D),还有是否内部使用(I)。 |
Message rate in | 消息流入率 | 消息发送到交换机的速率。 |
Message rate out | 消息流出率 | 从交换机路由到队列的消息的速率。 |
# 3.3 路由绑定页面
- To queue是用来指定队列与交换机之间的绑定关系。这个绑定是消息路由的基础,它建立了哪些队列可以从特定交换机接收消息的可能性。
- Routing key则是用来进一步决定在符合这种可能性的前提下,具体哪些消息会被路由到哪些队列。交换机会根据发送消息时指定的"Routing key",结合已建立的绑定关系中的路由键(可能是精确的字符串,模糊匹配模式,或其他规则),来决定消息的最终去向。
因此,一个队列要接收到交换机发来的消息,确实必须同时满足这两个条件:首先,队列需要与交换机有一个明确的绑定关系;其次,这个绑定需要包含一个路由键规则,该规则与消息的"Routing key"相匹配。
# 3.4 管道页面
在RabbitMQ的Web管理界面中,Channels(通道)页面展示了当前所有开启的通道及其详细信息。
英文名称 | 中文名称 | 描述 |
---|---|---|
Channel | 通道 | 通道的标识符,通常包含连接的IP地址和端口,以及通道编号。 |
User name | 用户名 | 创建通道的用户名称。 |
Mode | 模式 | 通道的工作模式,比如确认模式。 |
State | 状态 | 通道的当前状态,例如idle 表示空闲。 |
Unconfirmed | 未确认的消息数 | 发送给交换机但尚未收到确认的消息数量。 |
Prefetch | 预取数量 | 预取限制设置值,决定了消费者能够预取并处理多少条消息,而不必等待消息被确认,用于流控制。 |
Unacked | 未确认消息数 | 消费者已接收但尚未确认的消息数量。 |
publish | 发布速率 | 该通道发布消息到交换机的速率。 |
confirm | 确认速率 | 发布者收到消息确认的速率(如果开启了发布确认)。 |
unroutable (drop) | 不可路由(丢弃) | 消息无法路由到任何队列并被丢弃的速率。 |
deliver / get | 交付/获取速率 | 消息从队列交付给消费者的速率,或者通过basic.get 方法直接从队列中提取消息的速率。 |
ack | 确认速率 | 消费者确认消息的速率。 |