二、Work Queues 工作队列
Work Queues:在 works 间分配任务(竞争消费模式)。
介绍 竞争消费模式,用于多个消费者间竞争消息。
工作队列(又名任务队列)的主要思想是避免立即执行资源消耗型任务,并且必须等待它完成。
一、思路
当消息生产者同一时间发送多个任务时
- 多个消费者是如何接收任务的? - 按照顺序接收并处理
- 任务的完成耗时过长,如何处理中途某一消费者挂了的情况? - 使用消息确认
- 如果任务执行过程中,RabbitMQ服务停止,如何保证任务不丢失? - 开启消息持久化
- 如果每个消费者的处理时长不同,如何确保一个消费者不会堆积任务? - 开启prefetchCount公平调度
下面将会按照以上思路依次讲解。
在消息接收者消费消息时,假设发送者的每个字符都会耗时1S执行,以此来模拟资源消耗型任务。
1.1 循环调度
使用任务队列的优点之一是能够轻松地并行化工作,如果堆积的任务很多,我们只需要增加更多的消费者,就能够很容易地扩张规模。
假设有三个消息消费者,同时接收6个消息。
默认情况下,RabbitMQ 将按照顺序将每条消息发送给下一个使用者。
即每个消费者都会收到相同数量的消息。
1.2 消息确认
在未开启消息确认时,一旦 RabbitMQ 将消息传递给消费者后,会立即将其标记为删除。如果消费者执行某一任务耗时很长,在执行过程中挂掉,那么这条消息将会丢失,且其它尚未处理的消息也将丢失。
为了确保消息不会丢失,RabbitMQ 支持消息确认。即消费者执行完毕后,发送一个 ACK,告诉 RabbitMQ 已经接受并处理了一条特定消息。
如果消费者在没有发送 ACK 的情况下死亡(通道关闭、连接关闭、TCP连接丢失),RabbitMQ 会将消息重新进行排队(注:消息会放在队列头部,尽快执行)。
// 每次只接收一个任务
channel.basicQos(1);
DeliverCallback callback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("[1] 接收消息: " + msg);
try {
for (int i = 0; i < msg.length(); i++) {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] 执行完毕");
// 当任务执行完毕后,手动确认 - 确认后 RabbitMQ 才会删除消息,否则消息将会重排
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
// 关闭自动确认,改为手动确认
boolean autoAck = false;
channel.basicConsume("queue", autoAck, deliverCallback, consumerTag -> {});
1.3 消息持久性
消息确认,确保了即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍将丢失。
要确保 RabbitMQ 退出或崩溃后,队列和消息依然存在,则需要将队列和消息都标记为持久性。
// 1. 声明一个持久化消息的队列
boolean durable = true;
channel.queueDeclare("hello2", durable, false, false, null);
// 2. 将消息标记为 text plain
channel.basicPublish("", "hello2",
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
注:队列只有在第一次声明时才会创建。因此这里重新声明一个 hello2 的队列。
1.4 公平调度
在 1.1 循环调度 中已经说明,RabbitMQ 默认按照顺序分配任务给消费者们。
如果有两个消费者,消息队列中,所有奇数列的任务都较重,偶数列的任务较轻。那么会导致消费者A的处理时间无限延长,而消费者B处于空闲的情况。
为了避免这种情况,我们将 prefetchCount 设置为 1,这样 RabbitMQ 将每次只向消费者发送一条任务。当消费者完成该任务后,再发送新的消息任务。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
二、完整的代码
2.1 消息生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private final static String QUEUE_NAME = "hello3";
public static void main(String[] args) {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "任务3:00000000";
// 消息持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("[x] sent:" + message);
} catch (TimeoutException | IOException e) {
e.printStackTrace();
}
}
}
2.2 消息消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Recv {
private final static String QUEUE_NAME = "hello3";
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("[1] Waiting for messages. To exit press CTRL+C");
// 每次只接收一条消息
channel.basicQos(1);
DeliverCallback callback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("[1] 接收消息: " + msg);
try {
// 模拟资源消耗型任务,每个字符执行一秒
for (int i = 0; i < msg.length(); i++) {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] 执行完毕");
// 手动确认,消息执行完成
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
// 关闭自动确认,改为手动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, callback, consumerTag -> {
});
}
}