RabbitMQ(三)-发布订阅模式、交换器类型

三、Publish/Subscribe 发布订阅模式

Publish/Subscribe:同时向多个消费者发送消息

上一章中介绍的工作队列,是以每一个任务都只有一个工作者接收并消费为前提的。

发布订阅模式与普通的 工作队列 最大的区别在于:

  • 工作队列,以每个消息只能够被一个消费者消费为前提

  • 发布订阅,则能将单个消息传递给多个订阅者

    本章以一个简单的日志系统为例,一个程序发出日志消息,另一个程序接收并打印这些消息。

在日志系统中,有两个接收程序用于接收消息,一个接收器将日志导出到磁盘,另一个接收器展示日志到屏幕上。

本质上,已发布的日志消息将被广播到所有的接收者。

发布订阅模式

一、Exchanges 交换器

在之前的模式中,生产者产生消息并发送到消息队列上,由消息队列将消息按照顺序分配给消费者们。

而实际上,生产者在很多时候都不知道消息是否会被传递到任何队列。

因此,生产者通过向 Exchange(交换器) 发送消息,并由它将消息推送到队列中。交换器必须确切地知道如何处理它接收到的消息,是否应该将它添加到特定队列中?是否应该将它附加到许多队列中?或者应该丢弃它。其规则由交换器类型定义。

1.1 可用的交换类型

  • direct 直接交换器
    • 默认的 Exchange,完全根据 RoutingKey 来路由消息
    • 完全匹配RoutingKey
    • 设置 Exchange 和 Queue 的 Binding 时需要指定 RoutingKey(一般为 QueueName),发消息时也指定一样的 RoutingKey,消息就会被路由到对应的 Queue
    • 场景:如只将 Error 级别的日志写入磁盘的 Queue,可以使用指定的 RoutingKey 将写入磁盘文件的 Queue 绑定到 Direct Exchange 上
  • topic 主题交换器
    • 与 Direct Exchange 类似,都是通过 RoutingKey 来路由消息
    • 区别在于 Topic Exchange 支持模糊匹配 RoutingKey
    • Topic Exchange
    • * 表示匹配一个单词,#表示匹配没有或多个单词
    • 场景:如根据日志级别和消息来源,分别记录日志:则可以将日志来源 [user.*] 的消息绑定到 Topic Exchange 上,日志级别 [*.info] 的消息绑定到另一个 Exchange 上
  • headers 头文件交换器
    • 忽略 RoutingKey,而根据消息中的 Headers 和创建绑定关系时指定的 Arguments 来匹配路由
    • 性能较差,且Direct Exchange 完全可以替代它
  • fanout 扇形交换器
    • 忽略 RoutingKey 的设置,直接将 Message 广播到所有绑定的 Queue
    • 场景:日志系统,定义了两个 Queue 来存储消息,一个用于打印日志,一个用于写入磁盘日志文件。我们希望 Exchange 中的每一条消息都会被同时转发到两个 Queue 中

1.2 声明交换器

以扇形交换器为例:

// 参数分别为:交换器名称、交换器类型
channel.exchangeDeclare("logs",BuiltinExchangeType.FANOUT);

当手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空的 Direct Exchange 上,绑定 RoutingKey 与 队列名称相同。

生产者向交换器发送消息:

// 参数:
// 1. 交换器的名称,空字符串表示缺省或无名交换器;如果存在,则使用 routingKey 指定的名称将消息路由到队列
// 2. RoutingKey,可以是完整的队列名称,也可以带有匹配字符,且交换器为 fanout 类型时,会忽略该属性
// 3. 消息持久化方式
// 4. 消息主体
channel.basicPublish("logs", "hello", null, message.getBytes());

二、临时队列

2.1 声明队列

之前我们声明队列时,使用的是下面的方法:

channel.queueDeclare("hello", true, false, false, null);

分别指定了队列的名称、是否持久化、是否独占、是否自动删除、以及其它构造参数。

2.2 临时队列

但是对于日志系统来说,每当连接到 Rabbit 时,我们需要一个新的、空的队列;其次,当我们一旦断开消费者的连接后,队列应该被自动删除。

则我们可以通过 queueDeclare() 方法,创建一个非持久的、独占的、自动删除的队列,并生成一个名称:

// 返回的队列名称类似于:amq.gen-JzTY20BRgKO-HjmUJj0wLg
String queueName = channel.queueDeclare().getQueue();

三、Bindings 绑定

上面已经创建了一个 fanout 扇形交换器和 一个队列,现在需要告诉交换器向队列发送消息。

交换器与队列之间的关系称为绑定。

// 参数
// 1. queueName 队列名称
// 2. exchange 交换器名称
// 3. routingKey 路由
channel.queueBind(queueName, "logs", "");

四、完整代码

4.1 生产者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Publisher {

    public static void main(String[] args) {
        // 连接服务器
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明一个扇形交换器
            channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

            // 声明一个临时队列
            final String queueName = channel.queueDeclare().getQueue();

            // 绑定交换器与队列
            channel.queueBind(queueName, "logs", "");

            // 发送消息
            String message = "日志:线上服务没有问题";
            channel.basicPublish("logs", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("[x] sent:" + message);
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }
    }
}

4.2 订阅者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Subscribes {

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 声明交换器
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

        // 声明一个临时队列
        final String queueName = channel.queueDeclare().getQueue();

        // 绑定交换器与队列
        channel.queueBind(queueName, "logs", "");

        System.out.println("[1] Waiting for messages. To exit press CTRL+C");

        DeliverCallback callback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("[1] 接收消息: " + msg);
        };

        // 使用消息回调,缓冲消息
        channel.basicConsume(queueName, true, callback, consumerTag -> {
        });
    }
}

4.3 注意事项

  1. 与之前的任务队列不同的是,发布订阅模式要求发布者在广播消息时,消费者必须在线才能接收

五、参考资料

文章作者: koral
文章链接: http://luokaiii.github.io/2019/12/09/后端/消息队列/RocketMQ/4.PublishSubscribe/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自