一、Hello World 简单Demo
Hello World:简单的一对一示例。
1. 介绍
RabbitMQ 是一个消息代理,用于接收和转发消息。类似于一个邮局,由生产者将消息放进邮箱,且生产者确信邮局会将消息传递给收件人。
名词术语:
- Producing:消息的生产者,意味着仅做消息的发送
- Queue:消息队列,即存储消息的缓冲区。多个生产者可以将消息发送至一个队列,多个消费者可以尝试从一个队列接收数据
- Consuming:消费者,用于等待接收消息的程序
注:生产者、消费者和代理,不必强制在同一台主机上。(且大部分情况下,也不会在一台主机上)
2. 图解
3. 示例
3.1 引入RabbitMQ的依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
3.2 消息生产者
消息生产者用于发送消息,具体步骤大致为:连接服务器 > 声明队列 > 发送消息:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 创建一个服务器连接
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个队列,参数依次为:
// 1. queue:队列名称
// 2. durable:是否持久化,为true后重启服务器将依然存在
// 3. exclusive:是否为排他队列,为true后该队列将仅用于当前服务
// 4. autoDelete:是否自动删除,为true后,将在没有服务时删除队列
// 5. arguments:Queue的其他构造参数
// 6. return:返回一个声明状态
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello World2!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] sent:" + message);
} catch (TimeoutException | IOException e) {
e.printStackTrace();
}
}
}
3.3 消息消费者
消息消费者,用于接收并消费消息。
与生产者不同的是,接收者需要保持运行以侦听消息。
大致步骤为:连接服务器 > 声明队列(确保队列存在) > 消息回调(消费消息):
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 这里也声明了队列,因为如果在发布者之前启动消费者,这样可以确保队列存在
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("[*] Waiting for messages. To exit press CTRL+C");
DeliverCallback callback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("[x] Received: " + msg);
};
// 使用消息回调,缓冲消息
channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {
});
}
}
当运行多个消费者时,生产者发送的消息,只有一个消费者能够接收到消息(按照顺序依次轮询).