RabbitMQ(五)-Topic主题路由消息

五、Topics 主题

Topics:基于主题(pattern)接收消息。

在上一个 Routing 路由 案例中,我们改进了日志系统,使其不再只能使用扇形交换广播给所有订阅者,而是使用直接广播,有选择地接收日志。

但它仍然具有局限性,不能基于多个标准进行路由:除了日志级别分开记录外,我们还希望能够根据日志来源来路由日志。

一、Topic Exchange 主题交换器

发送到主题交换器的消息不能有任意的 RoutingKey,它必须是由点分隔的单词列表。如:stock.usd.nyse、nyse.vmw、quick.orange.rabbit。RoutingKey 不能超过 255 个字节。

RoutingKey 的逻辑:

  • *:代表一个单词
  • #:表示零个或多个单词

二、示例

示例以:[速度,颜色,物种]为组合的路由键(RoutingKey)。

Topic 主题路由

分别创建三个绑定:*.organge.*,*.*.rabbit,lazy.#。

fast.white.rabbit 将会匹配到 *.*.rabbitfast.white.rabbit.other 将会丢失,无法匹配;

此外,如果 RoutingKey 中直接使用 ‘#’ 绑定,则 Queue 中将接收所有类型的消息。

三、完整代码

3.1 发布者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Publisher {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        // 连接服务器
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明一个扇形交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            // 声明一个临时队列
            final String queueName = channel.queueDeclare().getQueue();

            // 可以依次测试 fast.white.rabbit √、slow.black.rabbit √、fast.black.dog ×
            final String routingKey = "fast.black.dog";

            // 绑定交换器与队列
            channel.queueBind(queueName, "logs", routingKey);

            // 发送消息5条消息
            String message = "这是一只跑的特别快的白色兔子";
            channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("[x] sent:" + message);
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }
    }
}

3.2 订阅者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Subscribes {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 声明一个临时队列
        final String queueName = channel.queueDeclare().getQueue();

        // 绑定队列与交换器,并制定routingKey
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");

        System.out.println("[1] Waiting for messages. To exit press CTRL+C");

        DeliverCallback callback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            final String routingKey = message.getEnvelope().getRoutingKey();
            System.out.println(String.format("接收到来自 [%s] 的消息: %s", routingKey, msg));
        };

        // 使用消息回调,缓冲消息
        channel.basicConsume(queueName, true, callback, consumerTag -> {
        });
    }
}

四、参考资料

文章作者: koral
文章链接: http://luokaiii.github.io/2019/12/09/后端/消息队列/RocketMQ/6.Topics/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自