三、Publish/Subscribe 发布订阅模式
Publish/Subscribe:同时向多个消费者发送消息
上一章中介绍的工作队列,是以每一个任务都只有一个工作者接收并消费为前提的。
发布订阅模式与普通的 工作队列 最大的区别在于:
工作队列,以每个消息只能够被一个消费者消费为前提
发布订阅,则能将单个消息传递给多个订阅者
本章以一个简单的日志系统为例,一个程序发出日志消息,另一个程序接收并打印这些消息。
在日志系统中,有两个接收程序用于接收消息,一个接收器将日志导出到磁盘,另一个接收器展示日志到屏幕上。
本质上,已发布的日志消息将被广播到所有的接收者。
一、Exchanges 交换器
在之前的模式中,生产者产生消息并发送到消息队列上,由消息队列将消息按照顺序分配给消费者们。
而实际上,生产者在很多时候都不知道消息是否会被传递到任何队列。
因此,生产者通过向 Exchange(交换器) 发送消息,并由它将消息推送到队列中。交换器必须确切地知道如何处理它接收到的消息,是否应该将它添加到特定队列中?是否应该将它附加到许多队列中?或者应该丢弃它。其规则由交换器类型定义。
1.1 可用的交换类型
- direct 直接交换器
- 默认的 Exchange,完全根据 RoutingKey 来路由消息
- 设置 Exchange 和 Queue 的 Binding 时需要指定 RoutingKey(一般为 QueueName),发消息时也指定一样的 RoutingKey,消息就会被路由到对应的 Queue
- 场景:如只将 Error 级别的日志写入磁盘的 Queue,可以使用指定的 RoutingKey 将写入磁盘文件的 Queue 绑定到 Direct Exchange 上
- topic 主题交换器
- 与 Direct Exchange 类似,都是通过 RoutingKey 来路由消息
- 区别在于 Topic Exchange 支持模糊匹配 RoutingKey
- * 表示匹配一个单词,#表示匹配没有或多个单词
- 场景:如根据日志级别和消息来源,分别记录日志:则可以将日志来源 [user.*] 的消息绑定到 Topic Exchange 上,日志级别 [*.info] 的消息绑定到另一个 Exchange 上
- headers 头文件交换器
- 忽略 RoutingKey,而根据消息中的 Headers 和创建绑定关系时指定的 Arguments 来匹配路由
- 性能较差,且Direct Exchange 完全可以替代它
- fanout 扇形交换器
- 忽略 RoutingKey 的设置,直接将 Message 广播到所有绑定的 Queue
- 场景:日志系统,定义了两个 Queue 来存储消息,一个用于打印日志,一个用于写入磁盘日志文件。我们希望 Exchange 中的每一条消息都会被同时转发到两个 Queue 中
1.2 声明交换器
以扇形交换器为例:
// 参数分别为:交换器名称、交换器类型
channel.exchangeDeclare("logs",BuiltinExchangeType.FANOUT);
当手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空的 Direct Exchange 上,绑定 RoutingKey 与 队列名称相同。
生产者向交换器发送消息:
// 参数:
// 1. 交换器的名称,空字符串表示缺省或无名交换器;如果存在,则使用 routingKey 指定的名称将消息路由到队列
// 2. RoutingKey,可以是完整的队列名称,也可以带有匹配字符,且交换器为 fanout 类型时,会忽略该属性
// 3. 消息持久化方式
// 4. 消息主体
channel.basicPublish("logs", "hello", null, message.getBytes());
二、临时队列
2.1 声明队列
之前我们声明队列时,使用的是下面的方法:
channel.queueDeclare("hello", true, false, false, null);
分别指定了队列的名称、是否持久化、是否独占、是否自动删除、以及其它构造参数。
2.2 临时队列
但是对于日志系统来说,每当连接到 Rabbit 时,我们需要一个新的、空的队列;其次,当我们一旦断开消费者的连接后,队列应该被自动删除。
则我们可以通过 queueDeclare()
方法,创建一个非持久的、独占的、自动删除的队列,并生成一个名称:
// 返回的队列名称类似于:amq.gen-JzTY20BRgKO-HjmUJj0wLg
String queueName = channel.queueDeclare().getQueue();
三、Bindings 绑定
上面已经创建了一个 fanout 扇形交换器和 一个队列,现在需要告诉交换器向队列发送消息。
交换器与队列之间的关系称为绑定。
// 参数
// 1. queueName 队列名称
// 2. exchange 交换器名称
// 3. routingKey 路由
channel.queueBind(queueName, "logs", "");
四、完整代码
4.1 生产者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Publisher {
public static void main(String[] args) {
// 连接服务器
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个扇形交换器
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 声明一个临时队列
final String queueName = channel.queueDeclare().getQueue();
// 绑定交换器与队列
channel.queueBind(queueName, "logs", "");
// 发送消息
String message = "日志:线上服务没有问题";
channel.basicPublish("logs", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("[x] sent:" + message);
} catch (TimeoutException | IOException e) {
e.printStackTrace();
}
}
}
4.2 订阅者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Subscribes {
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 声明一个临时队列
final String queueName = channel.queueDeclare().getQueue();
// 绑定交换器与队列
channel.queueBind(queueName, "logs", "");
System.out.println("[1] Waiting for messages. To exit press CTRL+C");
DeliverCallback callback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("[1] 接收消息: " + msg);
};
// 使用消息回调,缓冲消息
channel.basicConsume(queueName, true, callback, consumerTag -> {
});
}
}
4.3 注意事项
- 与之前的任务队列不同的是,发布订阅模式要求发布者在广播消息时,消费者必须在线才能接收