RabbitMQ入门之常见模式

1.什么是MQ

MQ(Message Queue):消息队列,是一种”先进先出“的数据结构。典型的模型就是我们所说的生产者、消费者模型。生产者不断地向消息队列中生产消息,消费者不断地从消息队列中获取消息,同时消息的生产和消费都是异步的,可以实现系统间的解耦

2.什么是RabbitMQ

RabbitMQ是使用Erlang语言开发的基于高级消息队列协议(Advanced Message Queuing Protocol,AMQP)的开源消息队列。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、数据可靠性、数据安全性

3.安装RabbitMQ

使用cat /etc/os-release查看系统版本号,我这里使用的是Ubuntu 20.04,对应的分支是focal

cat /etc/os-release

NAME="Ubuntu"
VERSION="20.04.2 LTS (Focal Fossa)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 20.04.2 LTS"
VERSION_ID="20.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=focal
UBUNTU_CODENAME=focal

3.1 安装erlang和RabbitMQ

使用以下脚本快速安装:

#!/usr/bin/sh

sudo apt-get install curl gnupg apt-transport-https -y

#
# Team RabbitMQ's main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Cloudsmith: modern Erlang repository
curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
## Cloudsmith: RabbitMQ repository
curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg > /dev/null

#
# Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu focal main
deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu focal main

#
# Provides RabbitMQ
##
deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu focal main
deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu focal main
EOF

#
# Update package indices
sudo apt-get update -y

#
# Install Erlang packages
sudo apt-get install -y erlang-base 
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets 
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key 
                        erlang-runtime-tools erlang-snmp erlang-ssl 
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

#
# Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing

3.2 启动RabbitMQ

# 启动rabbitmq
systemctl start rabbitmq-server

#
 查看rabbitmq运行状态
systemctl status rabbitmq-server

3.3 加载web管理界面插件

# 加载RabbitMQ的插件,这样我们可以使用web界面来管理RabbitMQ,默认使用guest用户登录,且必须使用localhost:15672来访问管理界面
sudo rabbitmq-plugins enable rabbitmq_management

#
 username:guest
# password:guest

3.4 RabbitMQ配置文件

RabbitMQ给我们提供了一个配置文件模版,我们可以参照这个来配置。

模版文件地址:https://Github.com/rabbitmq/rabbitmq-server/blob/v3.8.x/deps/rabbit/docs/rabbitmq.conf.example

在/etc/rabbitmq目录下创建rabbitmq.conf

# 文件名rabbitmq.conf
# 当该值为true时,我们只能通过localhost:15672来访问管理界面
# 当该值为false时,我们可以通过ip:15672来访问管理界面
loopback_users.guest = false

RabbitMQ入门之常见模式

3.5 相关命令

# 查看相关命令的使用
sudo rabbitmqctl help

4. Java整合RabbitMQ

4.1 引入依赖

<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>5.10.0</version>
</dependency>

4.2 第一种模型(直连)

RabbitMQ入门之常见模式

直连模式下,只有一个生产者和消费者,如果消费者处理消息的速度慢,但是生产者在源源不断的生产消息,就会导致消息的挤压

4.2.1 创建生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 如果不设置,默认使用guest用户
        factory.setUsername("stone");
        factory.setPassword("123456");
        factory.setHost("192.168.0.19");
        factory.setVirtualHost("test");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            // 声明与队列相关的参数,boolean durable 如果设置为true的话就是将队列持久化
            channel.queueDeclare(QUEUE_NAME, truefalsefalsenull);
            String message = "Hello World!";
            // String exchange, String routingKey, BasicProperties props, byte[] body
            // 发布消息
            // MessageProperties.PERSISTENT_TEXT_PLAIN  将消息持久化
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

4.2.2  创建消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Recv {
    // 这里我们并没有使用try-with-resource语句自动关闭channel和connection,这样可以使程序一直保持运行接收消息

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 如果不设置,默认使用guest用户
        factory.setUsername("stone");
        factory.setPassword("123456");
        factory.setHost("192.168.0.19");
        factory.setVirtualHost("test");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, truefalsefalsenull);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

4.3第二种模型(Work Queue)

RabbitMQ入门之常见模式

在工作队列模式中,默认使用的是轮询调度(Round-robin dispatching),RabbitMQ将会依次将消息发送给每个消费者,每个消费者将获得相同数量的消息

也可以手动设置为公平调度(Fair dispatch),即处理消息快的消费者会获得更多数量的消息来处理,处理消息慢的消费者获得的消息数量相对较少。

// 告诉RabbitMQ一次只给消费者一条消息,在该消费者处理完上一条消息之前,不再给该消费者发送消息
channel.basicQos(1);

4.3.1 生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 如果不设置,默认使用guest用户
        factory.setUsername("stone");
        factory.setPassword("123456");
        factory.setHost("192.168.0.104");
        factory.setVirtualHost("test");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 队列持久化
            boolean durable = true;
            channel.queueDeclare(TASK_QUEUE_NAME, durable, falsefalsenull);

            for (int i = 0; i < 100; i++) {
                String message = "task_message_.";
                message = message + i;
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }

        }
    }
}

4.3.2 消费者1

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Worker1 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 如果不设置,默认使用guest用户
        factory.setUsername("stone");
        factory.setPassword("123456");
        factory.setHost("192.168.0.104");
        factory.setVirtualHost("test");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, truefalsefalsenull);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     // 指示一次性只接收一条消息
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 将消息确认改为收到确认,当消费者处理消息宕机时,可以保证消息不丢失
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(3);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

4.3.3 消费者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Worker2 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 如果不设置,默认使用guest用户
        factory.setUsername("stone");
        factory.setPassword("123456");
        factory.setHost("192.168.0.104");
        factory.setVirtualHost("test");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, truefalsefalsenull);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 指示一次性只接收一条消息
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 将消息确认改为收到确认,当消费者处理消息宕机时,可以保证消息不丢失
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

4.4 第三种模型(Publish/Subscribe)

RabbitMQ入门之常见模式

在Publish/Subscribe模型中,生产者将消息发布到Exchange中,Exchange将消息推送到队列中,消费者再去队列中获得消息进行消费,这种模式是将一条消息发送给多个消费者。

RabbitMQ中消息传递模型的核心思想是,生产者从不直接向队列发送任何消息。生产者将消息发送到Exchange,Exchange将消息推送到队列中

Exchange的类型:direct,topic,headers 和fanout

**在这种模式下,我们主要介绍fanout类型的Exchange。**使用fanout类型,不需要设置routingKey,Exchange会将消息广播到与之绑定的所有的队列中。

  • direct
    • direct类型的Exchange
    • queue和Exchange绑定,并设置一个routingKey
    • 和routingKey完全匹配的消息将被路由到queue
  • topic
    • * 指代一个字符
    • # 指代一个或多个字符
    • topic类型的交换机
    • queue和Exchange绑定,并设置一个规则的routingKey
    • 匹配routingKey规则的消息,将路由到指定的queue
    • 路由规则
  • headers
  • fanout
    • fanout类型的Exchange
    • queue和Exchange绑定,不设置routingKey
    • 将收到的消息广播到与之绑定的所有队列

4.4.1 生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Provider {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.0.106");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            for (int i = 0; i < 10; i++) {
                String message = "测试fanout模型_" + i;
                // 因为是广播模型,所以不需要指定routingKey,消息将会推送至所有的queue
                channel.basicPublish(EXCHANGE_NAME, ""null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

4.4.2 消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Consumer {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.0.106");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 获得临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 将交换机和queue绑定起来,无需routingKey
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

4.5 第四种模型(Routing)

RabbitMQ入门之常见模式

与Subscribe/Publish模式不同之处在于,Routing模式的Exchange类型是direct,并且queue和Exchange绑定的时候,设置了routingKey,只有routingKey完全匹配的消息才会路由到queue中。

当消息的routingKeyerror时,消息将被路由到q1,当消息的routingKeywarning/info时,消息将被路由到q2

4.5.1 生产者

public class Provider {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.0.106");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String routingKey = "info";
            for (int i = 0; i < 10; i++) {
                String message = "direct_routing_message_" + i;
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
            }
        }
    }
}

4.5.2 消费者

public class Consumer {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.0.106");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 获得临时队列
        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

该模式下需要routingKey完全匹配的消息才能路由到queue中,在消息类型多的情况下不利于扩展,于是Topics模式诞生了

4.6 第五种模型(Topics)

RabbitMQ入门之常见模式

该模式与Routing模式的区别在于,Routing模式下,routingKey是指定的,Topics模式下,routingKey必须是一个单词列表,用.分割,最多255个字节,例如:"my.routingkey",我们可以使用通配符来指定路由规则。

当然如果我们在topics模式中,不使用特殊字符*和#,其效果和Routing模式是一样的。

*:指代一个字符

#:指代0个或多个字符

4.6.1 生产者

public class Provider {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String routingKey = "message.a";
            String message ="topic_message" ;

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }
}

4.6.2 消费者

public class Consumer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
  // 绑定队列,并设置路由规则,消费者能消费到routingKey例如:message.a,message.b的消息
        channel.queueBind(queueName, EXCHANGE_NAME, "message.*");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

最后,欢迎关注微信公众号一起交流


原文始发于微信公众号(阿喵程序园):RabbitMQ入门之常见模式

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/42866.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!