程序员scholar 程序员scholar
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
npm

(进入注册为作者充电)

  • 中间件 - RabbitMQ

    • 消息队列 - 介绍
    • RabbitMQ - 介绍
    • RabbitMQ - 安装
    • RabbitMQ - 基础案例
    • RabbitMQ - 应答与发布
    • RabbitMQ - 交换机
      • 1. Exchanges
        • Exchanges的类型
        • 默认exchange
      • 2. 临时队列
      • 3. 绑定bindings
      • 4. Fanout exchange
        • 扇出交换机(Fanout )介绍
        • 扇出交换机(Fanout )实战
      • 5. Direct exchange
        • 直接交换机(Direct )介绍
        • 多重绑定
        • Direct实战
      • 6. Topics exchange
        • 主题交换机(Topic)介绍
        • 主题交换机(Topic)案例
        • 主题交换机(Topic)实战
    • RabbitMQ - 死信队列
    • RabbitMQ - 延迟队列
    • RabbitMQ - 高级发布确认
    • RabbitMQ - 优先级
    • SpringBoot整合RabbitMQ
  • 消息数据库
  • 中间件 - RabbitMQ
scholar
2023-11-10
目录

RabbitMQ - 交换机

  • 1. Exchanges
    • Exchanges的类型
    • 默认exchange
  • 2. 临时队列
  • 3. 绑定bindings
  • 4. Fanout exchange
    • 扇出交换机(Fanout )介绍
    • 扇出交换机(Fanout )实战
  • 5. Direct exchange
    • 直接交换机(Direct )介绍
    • 多重绑定
    • Direct实战
  • 6. Topics exchange
    • 主题交换机(Topic)介绍
    • 主题交换机(Topic)案例
    • 主题交换机(Topic)实战

# 1. Exchanges

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机接收到生产者发来的消息后,根据预定的规则决定这些消息的去向。它可以是一个或多个队列,也可能是直接丢弃这些消息。如何处理,完全依赖于交换机的类型和配置。

image-20211110165519202

# Exchanges的类型

  1. 直接交换机(direct):处理路由键。需要将一个队列绑定到交换机上,要求发送的消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 abc ,则只有被标记为 abc 的消息才被转发,不会转发 abc.def,也不会转发 dog.ghi,只会转发 abc。

  2. 主题交换机(topic):将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”可以匹配一个或多个单词,符号 * 只能匹配一个单词。因此 abc.# 能够匹配到 abc.def.ghi,但是 abc.* 只会匹配到 abc.def。

  3. 头交换机(headers):不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的。

    匹配规则 x-match 有下列两种类型:

    x-match = all :表示所有的键值对都匹配才能接受到消息

    x-match = any :表示只要有键值对匹配就能接受到消息

  4. 扇出交换机(fanout):不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到该交换机的消息会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 交换机转发消息是最快的。

# 默认exchange

当你通过channel.basicPublish方法发送消息时,如果将交换机名称参数设置为"",实际上使用的是默认交换机。

channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
1
  • 第一个参数""表示使用默认交换机。
  • 第二个参数TASK_QUEUE_NAME是消息的路由键,也是目标队列的名称。
  • 最后两个参数分别是消息的属性和消息体。

与其他交换机不同,使用默认交换机时,你不需要手动绑定队列和交换机。默认情况下,每个创建的队列都会自动绑定到默认交换机,绑定键(binding key)即为队列的名称。

image-20240326000246189

# 2. 临时队列

在RabbitMQ中,有时我们需要一个队列,它是临时的、独特的,而且当使用完毕后能自动删除。这种队列被称为临时队列。这样的队列特别适合在需要一个临时的工作空间的场景中,比如RPC实现或者基于订阅的任务分发。

临时队列的特点如下:

  • 随机名称:临时队列的名称是随机生成的,这确保了队列的唯一性。我们不需要提前知道队列的名称,RabbitMQ会为我们提供一个唯一的名称。
  • 连接关闭后自动删除:创建该队列的连接关闭以后,队列会自动删除。这意味着你不需要担心清理或删除队列的工作。

要创建一个临时队列,你可以使用如下代码:

String queueName = channel.queueDeclare().getQueue();
1
  • queueDeclare():这个方法没有提供队列名称参数,这让RabbitMQ知道我们需要一个新的、随机命名的队列。
  • getQueue():这个方法返回RabbitMQ为队列自动生成的名称,我们可以用这个名称来确定队列的身份。

image-20240326013506876

使用场景

  • 短暂的工作负载:当你有一些不需要长期存储消息,只是暂时处理数据的场景时,临时队列是非常有用的。
  • 动态订阅:在发布/订阅模型中,如果订阅者只对当前存在的消息感兴趣,之后就不再需要订阅了,使用临时队列可以自动管理队列的生命周期。
  • RPC实现:在请求-响应模式中,请求者可以创建一个临时队列用于接收响应,一旦请求完成,队列就没用了,可以自动删除。

# 3. 绑定bindings

在RabbitMQ中,绑定(Binding)是非常核心的概念,它充当了交换机(Exchange)和队列(Queue)之间的桥梁。通过定义绑定,RabbitMQ能够知道如何将消息从交换机路由到一个或多个特定的队列。

绑定的本质

在RabbitMQ中,只需要你将队列与交换机绑定,并指定了绑定规则(即绑定键或路由键),那么之后只要发送的消息携带的路由键符合这个绑定规则,消息就会被路由到相应的队列中。

绑定如何工作

  • 对于直接交换机:绑定会指定一个路由键。只有当消息的路由键与绑定的路由键完全匹配时,消息才会被路由到绑定的队列。
  • 对于主题交换机:绑定允许使用通配符。这意味着消息的路由键只需部分匹配绑定的路由键,就可以路由到队列。
  • 对于扇出交换机:绑定不需要路由键。所有的消息都会被路由到与交换机绑定的所有队列。
  • 对于头交换机:绑定是基于消息头中的键值对而不是路由键。匹配规则更加灵活。

假设我们有一个名为X的交换机,以及两个队列Q1和Q2。如果我们创建了以下两个绑定:

  • 交换机X绑定了Q1队列,路由键为"123"
  • 交换机X绑定了Q2队列,路由键为"456"

那么,任何发送到交换机X的、路由键为"123"的消息都会被路由到队列Q1;同样,路由键为"456"的消息会被路由到队列Q2。如果X是一个扇出交换机,那么所有发送到X的消息都会同时路由到Q1和Q2,无视路由键。

image-20211110171016043

# 4. Fanout exchange

# 扇出交换机(Fanout )介绍

扇出交换机(Fanout )是RabbitMQ中一种非常简单但强大的交换机类型,它的主要特点是广播。这种交换机将接收到的每条消息广播到所有与之绑定的队列,无视任何路由键(Routing Key)的设置。这使得扇出交换机非常适合于实现发布/订阅(pub-sub)消息模式。

image-20211110171053309

# 扇出交换机(Fanout )实战

为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。其中启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘。

图例

image-20211110171112477

Logs 和临时队列的绑定关系如下图

image-20240326013749421

代码

注意

先启动两个消费者再启动生产者。

生产者生产消息后,如果没有对应的消费者接收,则该消息是遗弃的消息

2023-12-08 @scholar

生产者EmitLog 发送消息给两个消费者进行消费

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * 生产者 - EmitLog
 * 使用扇出交换机发送日志消息。
 */
public class EmitLog {
    // 定义扇出交换机的名称
    private static final String EXCHANGE_NAME = "logs";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 通过工具类获取与RabbitMQ服务的连接通道
        Channel channel = RabbitMQUtils.getChannel();
        /**
         * 声明(创建)一个扇出类型的交换机
         * 参数1: 交换机名称
         * 参数2: 交换机类型 - fanout 表示扇出交换机
         */
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 创建一个扫描器对象,用于接收控制台输入的消息
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入信息:");
        // 循环读取控制台输入的消息
        while (sc.hasNext()) {
            // 读取一行输入作为消息
            String message = sc.nextLine();
            /**
             * 将消息发布到扇出交换机
             * 参数1: 交换机名称
             * 参数2: 路由键 - 在扇出交换机中,此参数被忽略
             * 参数3: 消息的其他属性 - 如消息头等
             * 参数4: 消息内容字节数组
             */
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            // 控制台输出发送的消息
            System.out.println("生产者发出消息:" + message);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  • ReceiveLogs01 负责将接收到的消息打印在控制台,ReceiveLogs02 负责将介绍到的消息写出到文件。
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
    * 消费者 ReceiveLogs01
    * 从扇出交换机接收消息并打印到控制台。
    */
    public class ReceiveLogs01 {
      // 交换机名称
      private static final String EXCHANGE_NAME = "logs";
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 获取RabbitMQ连接通道
          Channel channel = RabbitMQUtils.getChannel();
          // 声明扇出类型的交换机
          channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
          /**
           * 创建一个临时队列,队列名称是随机的。
           * 这种队列是非持久的、独占的、自动删除的,主要用于此类临时场景。
           */
          String queueName = channel.queueDeclare().getQueue();
          // 将临时队列绑定到指定的交换机上,无需指定路由键(fanout交换机不需要路由键)
          channel.queueBind(queueName, EXCHANGE_NAME, "");
          System.out.println("等待接收消息,把接收到的消息打印在屏幕...........");
    
          // 消息到达的回调函数
          DeliverCallback deliverCallback = (consumerTag, delivery) -> {
              String message = new String(delivery.getBody(), "UTF-8");
              // 在控制台打印接收到的消息
              System.out.println("控制台打印接收到的消息:" + message);
          };
          // 开始监听队列上的消息
          channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
      }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.apache.commons.io.FileUtils;
    
    import java.io.File;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
    * 消费者2 - ReceiveLogs02
    * 从扇出交换机接收消息,并将消息内容写入到指定文件中。
    */
    public class ReceiveLogs02 {
      // 扇出交换机的名称
      private static final String EXCHANGE_NAME = "logs";
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 获取与RabbitMQ服务器的连接通道
          Channel channel = RabbitMQUtils.getChannel();
          // 声明一个扇出类型的交换机
          channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
          /**
           * 创建一个临时队列,队列名称是随机的。
           * 这种队列是非持久的、独占的、且断开连接时自动删除的。
           */
          String queueName = channel.queueDeclare().getQueue();
          // 将临时队列绑定到扇出交换机上,路由键为空字符串(因为扇出交换机会忽略路由键)
          channel.queueBind(queueName, EXCHANGE_NAME, "");
          System.out.println("等待接收消息,把接收到的消息写到文件...........");
    
          // 定义接收消息的回调
          DeliverCallback deliverCallback = (consumerTag, delivery) -> {
              String message = new String(delivery.getBody(), "UTF-8");
              // 指定消息写入的文件
              File file = new File("D:\\rabbitmq_info.txt");
              if(!file.exists()) {
                  file.createNewFile();
              }
              // 使用Apache Commons IO库将消息写入文件
              FileUtils.writeStringToFile(file, message, "UTF-8", true);
              System.out.println("数据写入文件成功");
          };
          // 开始接收消息。自动消息确认打开。
          channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
      }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    // Make sure to add code blocks to your code group

    # 5. Direct exchange

    在本节中,我们将通过引入直接交换机(Direct Exchange)的概念来扩展之前构建的日志系统。直接交换机使我们能够实现更精细的消息路由控制,让不同的消费者根据特定的标准(如消息的严重级别)订阅消息。这样,我们可以将关键日志(例如严重错误)定向地存储到日志文件中,而其他日志信息则可以在控制台上打印,从而有效地管理资源和信息流。

    # 直接交换机(Direct )介绍

    工作原理:与扇出交换机(Fanout Exchange)不同,直接交换机不会盲目地广播消息到所有绑定的队列。而是根据消息的路由键和队列的绑定键精确匹配,只有当两者完全相同,消息才会被路由到对应的队列。

    使用场景:这种精确匹配的特性使直接交换机非常适合实现如日志级别过滤这类场景。例如,我们可以设置只有当消息标记为“error”级别时,它才被路由到负责存储错误日志的队列;而其他级别的日志(如“warning”和“info”)则可以路由到其他队列或不进行存储。

    image-20211110172746642

    在上面这张图中,我们可以看到 交换机X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.

    在这种绑定情况下,生产者发布消息到 交换机X 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black 和 green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

    # 多重绑定

    image-20211110172858521

    当然如果 交换机X的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型虽然是 direct, 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。

    # Direct实战

    关系:

    image-20211110173025837

    交换机:

    image-20240326021330737

    C1 消费者:绑定 console 队列,routingKey 为 info、warning

    C2 消费者:绑定 disk 队列,routingKey 为 error

    当生产者发布消息到 direct_logs 交换机里,该交换机会检测消息的 routingKey 条件,然后分配到满足条件的队列里,最后由消费者从队列消费消息。

    生产者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 生产者 EmitLogDirect
     * 使用直接交换机发送带有不同路由键的消息。
     */
    public class EmitLogDirect {
        // 直接交换机的名称
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取与RabbitMQ服务的连接通道
            Channel channel = RabbitMQUtils.getChannel();
            // 声明一个直接类型的交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            
            // 创建一个包含不同路由键及对应消息的映射关系
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("info", "这是一个 info 信息");
            bindingKeyMap.put("warning", "这是一个 warning 信息");
            bindingKeyMap.put("error", "这是一个 error 信息");
            // 注意:debug 没有对应的队列消费者,所以这个消息会丢失
            bindingKeyMap.put("debug", "这是一个 debug 信息");
    
            // 遍历映射关系,发送消息
            for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
                String bindingKey = bindingKeyEntry.getKey(); // 路由键
                String message = bindingKeyEntry.getValue(); // 消息内容
                
                // 使用指定的路由键发布消息
                channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes());
                System.out.println("生产者发出消息:" + message);
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40

    两个消费者

      import com.rabbitmq.client.BuiltinExchangeType;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      
      import java.io.IOException;
      import java.nio.charset.StandardCharsets;
      import java.util.concurrent.TimeoutException;
      
      /**
      * 消费者C1 - 专门接收info和warning级别的日志消息。
      */
      public class ReceiveLogsDirect01 {
        // 直接交换机的名称
        public static final String EXCHANGE_NAME = "direct_logs";
      
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取与RabbitMQ服务的连接通道
            Channel channel = RabbitMQUtils.getChannel();
            // 声明一个直接类型的交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 指定队列名称
            String queueName = "console";
            // 声明队列(非持久、非独占、自动删除)
            channel.queueDeclare(queueName, false, false, false, null);
            // 绑定队列到交换机,并指定接收info和warning路由键的消息
            channel.queueBind(queueName, EXCHANGE_NAME, "info");
            channel.queueBind(queueName, EXCHANGE_NAME, "warning");
      
            System.out.println("等待接收消息...");
            
            // 消息到达的回调函数
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                // 将消息体转换为字符串
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                // 构造显示的消息,包括路由键和消息体
                message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
                System.out.println("info和warning消息已经接收:\n" + message);
            };
            // 开始消费队列上的消息
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      import com.rabbitmq.client.BuiltinExchangeType;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      
      import java.io.IOException;
      import java.nio.charset.StandardCharsets;
      import java.util.concurrent.TimeoutException;
      
      /**
      * 消费者 C2 - 专门接收error级别的日志消息。
      */
      public class ReceiveLogsDirect02 {
        // 直接交换机的名称
        public static final String EXCHANGE_NAME = "direct_logs";
      
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取与RabbitMQ服务的连接通道
            Channel channel = RabbitMQUtils.getChannel();
            // 声明一个直接类型的交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 指定队列名称
            String queueName = "disk";
            // 声明队列(非持久、非独占、自动删除)
            channel.queueDeclare(queueName, false, false, false, null);
            // 绑定队列到交换机,并指定接收error路由键的消息
            channel.queueBind(queueName, EXCHANGE_NAME, "error");
      
            System.out.println("等待接收消息...");
            
            // 消息到达的回调函数
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                // 将消息体转换为字符串
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                // 构造显示的消息,包括路由键和消息体
                message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
                System.out.println("error 消息已经接收:\n" + message);
            };
            // 开始消费队列上的消息
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      // Make sure to add code blocks to your code group

      # 6. Topics exchange

      # 主题交换机(Topic)介绍

      在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。

      尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性——比方说某个队列想同时接收的日志类型有 info.base 和 info.advantage,而某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用主题交换机(Topic)。

      Topic 的要求

      交换机类型是 topic时,我们发送消息在指定 routing_key 的时候不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" 这种类型的。

      这些单词可以是任意单词,同时这个单词列表最多不能超过 255 个字节。

      在这个规则列表中,其中有两个替换符是大家需要注意的:

      • *(星号)匹配路由键中的一个单词。
      • #(井号)匹配路由键中的零个或多个单词。

      # 主题交换机(Topic)案例

      下图绑定关系如下

      image-20211110175137166

      • Q1队列-->绑定的路由规则如下
        • 中间带 orange 的 3 个单词的字符串 (*.orange.*)
      • Q2队列-->绑定的路由规则如下
        • 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)
        • 第一个单词是 lazy 的多个单词 (lazy.#)

      上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

      例子 说明
      quick.orange.rabbit 被队列 Q1和Q2 同时接收到
      azy.orange.elephant 被队列 Q1和Q2 同时接收到
      quick.orange.fox 被队列 Q1 接收到
      lazy.brown.fox 被队列 Q2 接收到
      lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
      quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
      quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
      lazy.orange.male.rabbit 是四个单词但匹配 Q2

      笔记

      • 当一个队列绑定键是 #,那么这个队列将接收所有数据,就有点像 fanout 了

      • 如果队列绑定键当中没有 # 和 * 出现,那么该队列绑定类型就有点像 direct 了

      2023-12-08 @scholar

      # 主题交换机(Topic)实战

      image-20240326030056132

      生产多个消息到交换机,交换机按照通配符分配消息到不同的队列中,队列由消费者进行消费

      生产者 EmitLogTopic

      import com.rabbitmq.client.BuiltinExchangeType;
      import com.rabbitmq.client.Channel;
      import com.scholar.rabbaitmq.demo2.RabbitMQUtils;
      
      import java.io.IOException;
      import java.nio.charset.StandardCharsets;
      import java.util.HashMap;
      import java.util.Map;
      import java.util.concurrent.TimeoutException;
      
      /**
       * 生产者 EmitLogTopic - 使用主题交换机发送消息。
       */
      public class EmitLogTopic {
      
          // 主题交换机的名称
          private static final String EXCHANGE_NAME = "topic_logs";
      
          public static void main(String[] args) throws IOException, TimeoutException {
              // 获取与RabbitMQ服务的连接通道
              Channel channel = RabbitMQUtils.getChannel();
              // 声明一个主题类型的交换机
              channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
      
              /**
               * Q1队列-->绑定的路由规则如下
               *      中间带 orange 带 3 个单词的字符串(*.orange.*)
               * Q2队列-->绑定的路由规则如下
               *      最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
               *      第一个单词是 lazy 的多个单词(lazy.#)
               */
              HashMap<String, String> bindingKeyMap = new HashMap<>();
              bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
              bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
              bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
              bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
              bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
              bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
              bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
              bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
      
              // 遍历映射关系,发布消息
              for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
                  String bindingKey = bindingKeyEntry.getKey(); // 获取路由键
                  String message = bindingKeyEntry.getValue(); // 获取消息内容
                  
                  // 发布消息到交换机,指定路由键
                  channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8));
                  System.out.println("生产者发出消息:" + message);
              }
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52

      两个消费者

        import com.rabbitmq.client.BuiltinExchangeType;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.DeliverCallback;
        
        import java.io.IOException;
        import java.nio.charset.StandardCharsets;
        import java.util.concurrent.TimeoutException;
        
        /**
         * 消费者 1 - 订阅 "*.orange.*" 模式的消息。
         */
        public class ReceiveLogsTopic01 {
            // 主题交换机的名称
            private static final String EXCHANGE_NAME = "topic_logs";
        
            public static void main(String[] args) throws IOException, TimeoutException {
                // 获取与RabbitMQ服务的连接通道
                Channel channel = RabbitMQUtils.getChannel();
                // 声明一个主题类型的交换机
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                
                // 队列名称
                String queueName = "Q1";
                // 声明队列(非持久、非独占、自动删除)
                channel.queueDeclare(queueName, false, false, false, null);
                // 将队列绑定到交换机,并指定符合 "*.orange.*" 模式的路由键
                channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        
                System.out.println("等待接收消息........... ");
        
                // 消息到达的回调函数
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    // 将消息体转换为字符串
                    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                    // 打印接收到的消息,包括队列名、绑定键和消息内容
                    System.out.println("接收队列:" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message);
                };
                // 开始消费队列上的消息
                channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
            }
        }
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        import com.rabbitmq.client.BuiltinExchangeType;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.DeliverCallback;
        
        import java.io.IOException;
        import java.nio.charset.StandardCharsets;
        import java.util.concurrent.TimeoutException;
        
        /**
         * 消费者 2 - 订阅 "*.*.rabbit" 和 "lazy.#" 模式的消息。
         */
        public class ReceiveLogsTopic02 {
            // 主题交换机的名称
            private static final String EXCHANGE_NAME = "topic_logs";
        
            public static void main(String[] args) throws IOException, TimeoutException {
                // 获取与RabbitMQ服务的连接通道
                Channel channel = RabbitMQUtils.getChannel();
                // 声明一个主题类型的交换机
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                
                // 队列名称
                String queueName = "Q2";
                // 声明队列(非持久、非独占、自动删除)
                channel.queueDeclare(queueName, false, false, false, null);
                // 将队列绑定到交换机,并指定两种模式的路由键
                channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
                channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        
                System.out.println("等待接收消息........... ");
        
                // 消息到达的回调函数
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    // 将消息体转换为字符串
                    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                    // 打印接收到的消息,包括队列名、绑定键和消息内容
                    System.out.println("接收队列:" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message);
                };
                // 开始消费队列上的消息
                channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
            }
        }
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        // Make sure to add code blocks to your code group
        编辑此页 (opens new window)
        上次更新: 2024/12/28, 18:32:08
        RabbitMQ - 应答与发布
        RabbitMQ - 死信队列

        ← RabbitMQ - 应答与发布 RabbitMQ - 死信队列→

        Theme by Vdoing | Copyright © 2019-2025 程序员scholar
        • 跟随系统
        • 浅色模式
        • 深色模式
        • 阅读模式