RabbitMQ(二)-Work Queues工作队列、消息持久化

二、Work Queues 工作队列

Work Queues:在 works 间分配任务(竞争消费模式)。

介绍 竞争消费模式,用于多个消费者间竞争消息。

多消费者

工作队列(又名任务队列)的主要思想是避免立即执行资源消耗型任务,并且必须等待它完成。

一、思路

当消息生产者同一时间发送多个任务时

  1. 多个消费者是如何接收任务的? - 按照顺序接收并处理
  2. 任务的完成耗时过长,如何处理中途某一消费者挂了的情况? - 使用消息确认
  3. 如果任务执行过程中,RabbitMQ服务停止,如何保证任务不丢失? - 开启消息持久化
  4. 如果每个消费者的处理时长不同,如何确保一个消费者不会堆积任务? - 开启prefetchCount公平调度

下面将会按照以上思路依次讲解。

在消息接收者消费消息时,假设发送者的每个字符都会耗时1S执行,以此来模拟资源消耗型任务。

1.1 循环调度

使用任务队列的优点之一是能够轻松地并行化工作,如果堆积的任务很多,我们只需要增加更多的消费者,就能够很容易地扩张规模。

假设有三个消息消费者,同时接收6个消息。

默认情况下,RabbitMQ 将按照顺序将每条消息发送给下一个使用者

即每个消费者都会收到相同数量的消息。

1.2 消息确认

在未开启消息确认时,一旦 RabbitMQ 将消息传递给消费者后,会立即将其标记为删除。如果消费者执行某一任务耗时很长,在执行过程中挂掉,那么这条消息将会丢失,且其它尚未处理的消息也将丢失。

为了确保消息不会丢失,RabbitMQ 支持消息确认。即消费者执行完毕后,发送一个 ACK,告诉 RabbitMQ 已经接受并处理了一条特定消息。

如果消费者在没有发送 ACK 的情况下死亡(通道关闭、连接关闭、TCP连接丢失),RabbitMQ 会将消息重新进行排队(注:消息会放在队列头部,尽快执行)。

// 每次只接收一个任务
channel.basicQos(1);

DeliverCallback callback = (consumerTag, message) -> {
    String msg = new String(message.getBody(), StandardCharsets.UTF_8);
    System.out.println("[1] 接收消息: " + msg);

    try {
        for (int i = 0; i < msg.length(); i++) {
            Thread.sleep(1000);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("[1] 执行完毕");
        // 当任务执行完毕后,手动确认 - 确认后 RabbitMQ 才会删除消息,否则消息将会重排
        channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
    }
};

// 关闭自动确认,改为手动确认
boolean autoAck = false;
channel.basicConsume("queue", autoAck, deliverCallback, consumerTag -> {});

1.3 消息持久性

消息确认,确保了即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍将丢失。

要确保 RabbitMQ 退出或崩溃后,队列和消息依然存在,则需要将队列和消息都标记为持久性。

// 1. 声明一个持久化消息的队列
boolean durable = true;
channel.queueDeclare("hello2", durable, false, false, null);

// 2. 将消息标记为 text plain
channel.basicPublish("", "hello2",
                     MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

注:队列只有在第一次声明时才会创建。因此这里重新声明一个 hello2 的队列。

1.4 公平调度

1.1 循环调度 中已经说明,RabbitMQ 默认按照顺序分配任务给消费者们。

如果有两个消费者,消息队列中,所有奇数列的任务都较重,偶数列的任务较轻。那么会导致消费者A的处理时间无限延长,而消费者B处于空闲的情况。

为了避免这种情况,我们将 prefetchCount 设置为 1,这样 RabbitMQ 将每次只向消费者发送一条任务。当消费者完成该任务后,再发送新的消息任务。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

二、完整的代码

2.1 消息生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private final static String QUEUE_NAME = "hello3";

    public static void main(String[] args) {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 持久化队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            String message = "任务3:00000000";
            // 消息持久化
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("[x] sent:" + message);
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }
    }
}

2.2 消息消费者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Recv {
    private final static String QUEUE_NAME = "hello3";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("[1] Waiting for messages. To exit press CTRL+C");

        // 每次只接收一条消息
        channel.basicQos(1);

        DeliverCallback callback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("[1] 接收消息: " + msg);

            try {
                // 模拟资源消耗型任务,每个字符执行一秒
                for (int i = 0; i < msg.length(); i++) {
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("[1] 执行完毕");
                // 手动确认,消息执行完成
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };

        // 关闭自动确认,改为手动确认
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, callback, consumerTag -> {
        });
    }
}

三、参考资料

文章作者: koral
文章链接: http://luokaiii.github.io/2019/12/09/后端/消息队列/RocketMQ/3.WorkQueues/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自