RabbitMQ(四)-Routing直接路由选择

四、Routing 路由

Routing:有选择地接收消息。

路由选择,在本示例中,将错误消息保存到日志文件中,且同时能在控制台打印所有的日志消息。

如下图所示:

RoutingKey示例

一、Bindings 绑定

在前面的例子中,我们将扇形交换器与队列做了绑定:

channel.queueBind(queueName, exchangeName, "");

可以理解为:队列会接收来自指定交换器的消息;或者是交换器会向绑定的队列传递消息。

Bindings 可以采用额外的 RoutingKey 参数,但是扇形交换器会忽略 RoutingKey。

二、 Direct Exchange 直接交换器

之前的日志系统,将日志广播给所有的消费者。现在,我们希望程序只将错误日志写入磁盘,以减少磁盘空间的占用。

扇形交换器(fanout)只能是无意识的广播,因此,我们将改用 Direct Exchange(直接交换器),消息发送到其绑定见(RoutingKey)与消息的路由键完全匹配的队列。

2.1 多个绑定

同一个交换器是可以绑定给多个队列的。

2.2 多重绑定

同一个路由也是可以绑定给多个队列的。

2.3 声明Exchange

像之前构建 fanout 扇形交换器一样:

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

发送消息:

channel.basicPublish(EXCHANGE_NAME, severity, 
                     MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

2.4 订阅

与之前的示例一样:

String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, EXCHANGE_NAME, severity);

三、完整代码

如果exchange存在,可以换一个或者删了就行。

3.1 生产者

这里直接让生产者一次发送5条日志消息。

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Publisher {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        // 连接服务器
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明一个扇形交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            // 声明一个临时队列
            final String queueName = channel.queueDeclare().getQueue();

            // 绑定交换器与队列
            channel.queueBind(queueName, "logs", "");

            // 发送消息5条消息
            for (int i = 0; i < 5; i++) {
                String routingKey = i < 2 ? "info" : i < 4 ? "error" : "debug";
                String message = "日志:线上服务没有问题";
                channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println("[x] sent:" + message);
            }
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }
    }
}

3.2 订阅者1

打印所有级别的日志至控制台,所以该订阅者需要绑定 logs 交换器下的所有路由:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Subscribes {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 声明一个临时队列
        final String queueName = channel.queueDeclare().getQueue();

        // 绑定所有日志类型
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "debug");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println("[1] Waiting for messages. To exit press CTRL+C");

        DeliverCallback callback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            final String routingKey = message.getEnvelope().getRoutingKey();
            System.out.println(String.format("[%s] 打印信息至控制台: %s", routingKey, msg));
        };

        // 使用消息回调,缓冲消息
        channel.basicConsume(queueName, true, callback, consumerTag -> {
        });
    }
}

3.3 订阅者2

该订阅者保存 error 级别的日志至文件中,因此只需要绑定 logs 交换器下的 error 路由即可:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Subscribes2 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 声明一个临时队列
        final String queueName = channel.queueDeclare().getQueue();

        // 只绑定debug类型的路由
        channel.queueBind(queueName, EXCHANGE_NAME, "debug");

        System.out.println("[1] Waiting for messages. To exit press CTRL+C");

        DeliverCallback callback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            final String routingKey = message.getEnvelope().getRoutingKey();
            System.out.println(String.format("[%s] 保存日志文件: %s", routingKey, msg));
        };

        // 使用消息回调,缓冲消息
        channel.basicConsume(queueName, true, callback, consumerTag -> {
        });
    }
}

3.4 执行结果

依次执行 Subscribes、Subscribes2、Publisher,可得到以下日志信息:

Publisher:
[x] sent:日志:线上服务没有问题
[x] sent:日志:线上服务没有问题
[x] sent:日志:线上服务没有问题
[x] sent:日志:线上服务没有问题
[x] sent:日志:线上服务没有问题

Subscribes:
[1] Waiting for messages. To exit press CTRL+C
[info] 打印信息至控制台: 日志:线上服务没有问题
[info] 打印信息至控制台: 日志:线上服务没有问题
[error] 打印信息至控制台: 日志:线上服务没有问题
[error] 打印信息至控制台: 日志:线上服务没有问题
[debug] 打印信息至控制台: 日志:线上服务没有问题

Subscribes2:
[1] Waiting for messages. To exit press CTRL+C
[debug] 保存日志文件: 日志:线上服务没有问题

四、参考资料

文章作者: koral
文章链接: http://luokaiii.github.io/2019/12/09/后端/消息队列/RocketMQ/5.Routing/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自