五、Topics 主题
Topics:基于主题(pattern)接收消息。
在上一个 Routing 路由 案例中,我们改进了日志系统,使其不再只能使用扇形交换广播给所有订阅者,而是使用直接广播,有选择地接收日志。
但它仍然具有局限性,不能基于多个标准进行路由:除了日志级别分开记录外,我们还希望能够根据日志来源来路由日志。
一、Topic Exchange 主题交换器
发送到主题交换器的消息不能有任意的 RoutingKey,它必须是由点分隔的单词列表。如:stock.usd.nyse、nyse.vmw、quick.orange.rabbit。RoutingKey 不能超过 255 个字节。
RoutingKey 的逻辑:
- *:代表一个单词
- #:表示零个或多个单词
二、示例
示例以:[速度,颜色,物种]为组合的路由键(RoutingKey)。
分别创建三个绑定:*.organge.*,*.*.rabbit,lazy.#。
则 fast.white.rabbit 将会匹配到 *.*.rabbit;fast.white.rabbit.other 将会丢失,无法匹配;
此外,如果 RoutingKey 中直接使用 ‘#’ 绑定,则 Queue 中将接收所有类型的消息。
三、完整代码
3.1 发布者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Publisher {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
// 连接服务器
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个扇形交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明一个临时队列
final String queueName = channel.queueDeclare().getQueue();
// 可以依次测试 fast.white.rabbit √、slow.black.rabbit √、fast.black.dog ×
final String routingKey = "fast.black.dog";
// 绑定交换器与队列
channel.queueBind(queueName, "logs", routingKey);
// 发送消息5条消息
String message = "这是一只跑的特别快的白色兔子";
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("[x] sent:" + message);
} catch (TimeoutException | IOException e) {
e.printStackTrace();
}
}
}
3.2 订阅者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Subscribes {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明一个临时队列
final String queueName = channel.queueDeclare().getQueue();
// 绑定队列与交换器,并制定routingKey
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
System.out.println("[1] Waiting for messages. To exit press CTRL+C");
DeliverCallback callback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
final String routingKey = message.getEnvelope().getRoutingKey();
System.out.println(String.format("接收到来自 [%s] 的消息: %s", routingKey, msg));
};
// 使用消息回调,缓冲消息
channel.basicConsume(queueName, true, callback, consumerTag -> {
});
}
}