七、Publisher Confirms 发布确认
当 Publisher Confirms 在 Channel 上启用时,客户端发布的消息由代理异步确认,这意味着它们已经在服务器端得到了处理。
只要消息发送成功,就会进行确认,不管有没有消费者存在,或者消费者有没有消费成功。
一、开启 Publisher Confirms
默认情况下 Publisher Confirms 不启用,可以使用 confirmSelect 方法启用 Channel 级别的 confirm。
Channel channel = connection.createChannel();
channel.confirmSelect();
二、确认策略
2.1 单独发布消息
单独发布消息,即发布一条消息,并同步等待其确认:
channel.basicPublish(EXCHANGE_NAME, routingKey,
properties, message.getBytes());
channel.waitForConfirmsOrDie(5000);
waitForConfirmsOrDie:在指定时间内等待消息确认,如果超时则该方法将抛出异常。异常的处理通常包括记录错误消息/重新尝试发送消息。
这种技术非常简单,但是有一个主要的缺点:由于消息的确认阻碍了后续消息的发布,会显著减缓发布的速度。
2.2 批量发布消息
为了改进前面的示例,我们可以发布一批消息,然后等待确认整批消息:
int batchSize = 100;
int outstandingMessageCount = 0;
while(thereAreMessagesToPublish()){
channel.basicPublish(EXCHANGE_NAME, queueName, properties, message.getBytes());
outstandingMessageCount++;
if(outstandingMessageCount == batchSize){
channel.waitForConfirmsOrDie(5000);
outstandingMessageCount = 0;
}
}
if(outstandingMessageCount > 0){
channel.waitForConfirmsOrDie(5000);
}
优点:与单个消息确认相比,批量等待消息确认大大提高了吞吐量;
缺点是:不知道在失败时到底出了什么问题,所以我们需要记录整个批处理,或者重新发布所有消息;
且该方法仍然是同步的,依然后阻碍后续消息的发布。
2.3 异步处理消息确认
通过在 客户端 上注册一个回调,就能得到确认消息的通知:
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed 消息确认回调
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed 取消应答回调
})
每个回调方法,包含两个参数:
- sequence number:序列号
- multiple:true 表示确认多条序列号相同或更低的消息;false 表示确认一条消息
获得序列号的方法:
// 在发布消息前,获得下一次发布的序列号
int sequenceNumber = channel.getNextPublishSeqNo();
channel.basicPublish(EXCHANGE_NAME, queueName, properties, message.getBytes());
以下示例包含一个回调,在消息确认时清理映射。
ConcurrentNavigableMap<Long,String> confirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if(multiple){
ConcurrentNavigableMap<Long,String> confirmed = confirms.headMap(sequenceNumber, true);
confirmed.clear();
} else {
confirms.remove(sequenceNumber);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = confirms.get(sequenceNumber);
System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
})
// ... publishing code
总而言之,异步处理发布确认,通常需要以下几个步骤:
- 将发布序列号与消息相关联
- 在 Channel 上注册一个 confirm 监听器,当发布者 ack/nack 到达并执行操作时通知它。(序列号与消息关联机制可能还需要进行一些清理)
- 在发布消息之前跟踪发布序列号
尽量避免在回调中重新发布一个 nack-ed 的消息,因为确认回调是在 I/O 线程中发送的,Channel 不应该在这个线程中执行操作。
更好的解决方法是:在内存中的队列对消息进行排队,该队列由发布线程轮询。像 ConcurrentLinkedQueue 这样的类是在确认回调和发布线程间传输消息的好的选择。
三、总结
- 单独发布消息:同步等待确认;简单;吞吐量非常有限
- 批量发布消息:同步等待批处理的确认;简单;合理的吞吐量;出问题后很难排查
- 异步处理:最佳的性能和资源利用;良好的控制错误的情况下,可以参与正确实施