四、Routing 路由
Routing:有选择地接收消息。
路由选择,在本示例中,将错误消息保存到日志文件中,且同时能在控制台打印所有的日志消息。
如下图所示:
一、Bindings 绑定
在前面的例子中,我们将扇形交换器与队列做了绑定:
channel.queueBind(queueName, exchangeName, "");
可以理解为:队列会接收来自指定交换器的消息;或者是交换器会向绑定的队列传递消息。
Bindings 可以采用额外的 RoutingKey 参数,但是扇形交换器会忽略 RoutingKey。
二、 Direct Exchange 直接交换器
之前的日志系统,将日志广播给所有的消费者。现在,我们希望程序只将错误日志写入磁盘,以减少磁盘空间的占用。
扇形交换器(fanout)只能是无意识的广播,因此,我们将改用 Direct Exchange(直接交换器),消息发送到其绑定见(RoutingKey)与消息的路由键完全匹配的队列。
2.1 多个绑定
同一个交换器是可以绑定给多个队列的。
2.2 多重绑定
同一个路由也是可以绑定给多个队列的。
2.3 声明Exchange
像之前构建 fanout 扇形交换器一样:
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
发送消息:
channel.basicPublish(EXCHANGE_NAME, severity,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
2.4 订阅
与之前的示例一样:
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, severity);
三、完整代码
如果exchange存在,可以换一个或者删了就行。
3.1 生产者
这里直接让生产者一次发送5条日志消息。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Publisher {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
// 连接服务器
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个扇形交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个临时队列
final String queueName = channel.queueDeclare().getQueue();
// 绑定交换器与队列
channel.queueBind(queueName, "logs", "");
// 发送消息5条消息
for (int i = 0; i < 5; i++) {
String routingKey = i < 2 ? "info" : i < 4 ? "error" : "debug";
String message = "日志:线上服务没有问题";
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("[x] sent:" + message);
}
} catch (TimeoutException | IOException e) {
e.printStackTrace();
}
}
}
3.2 订阅者1
打印所有级别的日志至控制台,所以该订阅者需要绑定 logs 交换器下的所有路由:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Subscribes {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个临时队列
final String queueName = channel.queueDeclare().getQueue();
// 绑定所有日志类型
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "debug");
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println("[1] Waiting for messages. To exit press CTRL+C");
DeliverCallback callback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
final String routingKey = message.getEnvelope().getRoutingKey();
System.out.println(String.format("[%s] 打印信息至控制台: %s", routingKey, msg));
};
// 使用消息回调,缓冲消息
channel.basicConsume(queueName, true, callback, consumerTag -> {
});
}
}
3.3 订阅者2
该订阅者保存 error 级别的日志至文件中,因此只需要绑定 logs 交换器下的 error 路由即可:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Subscribes2 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个临时队列
final String queueName = channel.queueDeclare().getQueue();
// 只绑定debug类型的路由
channel.queueBind(queueName, EXCHANGE_NAME, "debug");
System.out.println("[1] Waiting for messages. To exit press CTRL+C");
DeliverCallback callback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
final String routingKey = message.getEnvelope().getRoutingKey();
System.out.println(String.format("[%s] 保存日志文件: %s", routingKey, msg));
};
// 使用消息回调,缓冲消息
channel.basicConsume(queueName, true, callback, consumerTag -> {
});
}
}
3.4 执行结果
依次执行 Subscribes、Subscribes2、Publisher,可得到以下日志信息:
Publisher:
[x] sent:日志:线上服务没有问题
[x] sent:日志:线上服务没有问题
[x] sent:日志:线上服务没有问题
[x] sent:日志:线上服务没有问题
[x] sent:日志:线上服务没有问题
Subscribes:
[1] Waiting for messages. To exit press CTRL+C
[info] 打印信息至控制台: 日志:线上服务没有问题
[info] 打印信息至控制台: 日志:线上服务没有问题
[error] 打印信息至控制台: 日志:线上服务没有问题
[error] 打印信息至控制台: 日志:线上服务没有问题
[debug] 打印信息至控制台: 日志:线上服务没有问题
Subscribes2:
[1] Waiting for messages. To exit press CTRL+C
[debug] 保存日志文件: 日志:线上服务没有问题