RocketMQ(一)-Hello World 简单Demo

一、Hello World 简单Demo

Hello World:简单的一对一示例。

1. 介绍

RabbitMQ 是一个消息代理,用于接收和转发消息。类似于一个邮局,由生产者将消息放进邮箱,且生产者确信邮局会将消息传递给收件人。

名词术语:

  • Producing:消息的生产者,意味着仅做消息的发送
  • Queue:消息队列,即存储消息的缓冲区。多个生产者可以将消息发送至一个队列,多个消费者可以尝试从一个队列接收数据
  • Consuming:消费者,用于等待接收消息的程序

注:生产者、消费者和代理,不必强制在同一台主机上。(且大部分情况下,也不会在一台主机上)

2. 图解

点对点模式

3. 示例

3.1 引入RabbitMQ的依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.4.3</version>
</dependency>

3.2 消息生产者

消息生产者用于发送消息,具体步骤大致为:连接服务器 > 声明队列 > 发送消息:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        // 创建一个服务器连接
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明一个队列,参数依次为:
            // 1. queue:队列名称
            // 2. durable:是否持久化,为true后重启服务器将依然存在
            // 3. exclusive:是否为排他队列,为true后该队列将仅用于当前服务
            // 4. autoDelete:是否自动删除,为true后,将在没有服务时删除队列
            // 5. arguments:Queue的其他构造参数
            // 6. return:返回一个声明状态
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 发送消息
            String message = "Hello World2!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] sent:" + message);
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }
    }
}

3.3 消息消费者

消息消费者,用于接收并消费消息。

与生产者不同的是,接收者需要保持运行以侦听消息。

大致步骤为:连接服务器 > 声明队列(确保队列存在) > 消息回调(消费消息):

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 这里也声明了队列,因为如果在发布者之前启动消费者,这样可以确保队列存在
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("[*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback callback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("[x] Received: " + msg);
        };

        // 使用消息回调,缓冲消息
        channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {
        });
    }
}

当运行多个消费者时,生产者发送的消息,只有一个消费者能够接收到消息(按照顺序依次轮询).

4. 参考资料

文章作者: koral
文章链接: http://luokaiii.github.io/2019/12/09/后端/消息队列/RocketMQ/2.HelloWorld/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自