RabbitMQ(七)-Publisher Confirms 发布确认

七、Publisher Confirms 发布确认

Publisher Confirms 在 Channel 上启用时,客户端发布的消息由代理异步确认,这意味着它们已经在服务器端得到了处理。

只要消息发送成功,就会进行确认,不管有没有消费者存在,或者消费者有没有消费成功。

一、开启 Publisher Confirms

默认情况下 Publisher Confirms 不启用,可以使用 confirmSelect 方法启用 Channel 级别的 confirm。

Channel channel = connection.createChannel();
channel.confirmSelect();

二、确认策略

2.1 单独发布消息

单独发布消息,即发布一条消息,并同步等待其确认:

channel.basicPublish(EXCHANGE_NAME, routingKey,
                     properties, message.getBytes());
channel.waitForConfirmsOrDie(5000);

waitForConfirmsOrDie:在指定时间内等待消息确认,如果超时则该方法将抛出异常。异常的处理通常包括记录错误消息/重新尝试发送消息。

这种技术非常简单,但是有一个主要的缺点:由于消息的确认阻碍了后续消息的发布,会显著减缓发布的速度

2.2 批量发布消息

为了改进前面的示例,我们可以发布一批消息,然后等待确认整批消息:

int batchSize = 100;
int outstandingMessageCount = 0;
while(thereAreMessagesToPublish()){
    channel.basicPublish(EXCHANGE_NAME, queueName, properties, message.getBytes());
    outstandingMessageCount++;
    if(outstandingMessageCount == batchSize){
        channel.waitForConfirmsOrDie(5000);
        outstandingMessageCount = 0;
    }
}
if(outstandingMessageCount > 0){
    channel.waitForConfirmsOrDie(5000);
}

优点:与单个消息确认相比,批量等待消息确认大大提高了吞吐量;

缺点是:不知道在失败时到底出了什么问题,所以我们需要记录整个批处理,或者重新发布所有消息;

且该方法仍然是同步的,依然后阻碍后续消息的发布。

2.3 异步处理消息确认

通过在 客户端 上注册一个回调,就能得到确认消息的通知:

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
    // code when message is confirmed 消息确认回调
}, (sequenceNumber, multiple) -> {
    // code when message is nack-ed 取消应答回调
})

每个回调方法,包含两个参数:

  • sequence number:序列号
  • multiple:true 表示确认多条序列号相同或更低的消息;false 表示确认一条消息

获得序列号的方法:

// 在发布消息前,获得下一次发布的序列号
int sequenceNumber = channel.getNextPublishSeqNo();
channel.basicPublish(EXCHANGE_NAME, queueName, properties, message.getBytes());

以下示例包含一个回调,在消息确认时清理映射。

ConcurrentNavigableMap<Long,String> confirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
    if(multiple){
        ConcurrentNavigableMap<Long,String> confirmed = confirms.headMap(sequenceNumber, true);
        confirmed.clear();
    } else {
        confirms.remove(sequenceNumber);
    }
};

channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
    String body = confirms.get(sequenceNumber);
    System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
      body, sequenceNumber, multiple);
    cleanOutstandingConfirms.handle(sequenceNumber, multiple);
})
// ... publishing code

总而言之,异步处理发布确认,通常需要以下几个步骤:

  • 将发布序列号与消息相关联
  • 在 Channel 上注册一个 confirm 监听器,当发布者 ack/nack 到达并执行操作时通知它。(序列号与消息关联机制可能还需要进行一些清理)
  • 在发布消息之前跟踪发布序列号

尽量避免在回调中重新发布一个 nack-ed 的消息,因为确认回调是在 I/O 线程中发送的,Channel 不应该在这个线程中执行操作。

更好的解决方法是:在内存中的队列对消息进行排队,该队列由发布线程轮询。像 ConcurrentLinkedQueue 这样的类是在确认回调和发布线程间传输消息的好的选择。

三、总结

  • 单独发布消息:同步等待确认;简单;吞吐量非常有限
  • 批量发布消息:同步等待批处理的确认;简单;合理的吞吐量;出问题后很难排查
  • 异步处理:最佳的性能和资源利用;良好的控制错误的情况下,可以参与正确实施

四、参考资料

文章作者: koral
文章链接: http://luokaiii.github.io/2019/12/10/后端/消息队列/RocketMQ/8.PublisherConfirms/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自