【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式

有目标就不怕路远。年轻人.无论你现在身在何方.重要的是你将要向何处去。只有明确的目标才能助你成功。没有目标的航船.任何方向的风对他来说都是逆风。因此,再遥远的旅程,只要有目标.就不怕路远。没有目标,哪来的劲头?一车尔尼雷夫斯基

导读:本篇文章讲解 【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

目录

一、消息队列

1.1、发布确认模式

1.2、案例代码

(1)引入依赖

(2)编写生产者【消息确认–单条确认】

(3)编写生产者【消息确认–批量确认】

(4)编写生产者【消息确认–异步确认】


一、消息队列

1.1、发布确认模式

RabbitMQ消息队列中,生产者发送消息给RabbitMQ的时候,可能会出现发送失败的情况,如果不进行处理,此时这一条消息就将丢失。如何确保生产者一定能够将消息发送到RabbitMQ里面呢???

RabbitMQ提出了一种发布确认模式,这种模式大致思想是:生产者发送消息给RabbitMQ时候,如果RabbitMQ正确接收到消息后,需要发给一个ACK标识给生产者,生产者接收到ACK标记后,就可以确认这一条消息发送成功啦。如果生产者没有接收到ACK标识,则可以重复发送这一条消息给RabbitMQ,这就可以确保消息不丢失。

【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式

发布确认模式有三种实现,分别是:逐条确认机制、批量确认机制、异步确认机制。

1.2、案例代码

(1)引入依赖

<!-- 引入 RabbitMQ 依赖 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>

(2)编写生产者【消息确认–单条确认】

  • 生产者发送消息的时候,需要调用【confirmSelect()】方法开启消息确认机制。
  • 生产者将消息发送完成之后,需要调用【waitForConfirms()】方法,阻塞等待RabbitMQ消息队列返回ACK标识。这个方法返回一个boolean类型,true表示RabbitMQ接收消息成功,false表示接收失败。
  • 【waitForConfirms()】方法还可以指定一个超时时间,如果在这个超时时间里面RabbitMQ还没有返回ACK标识,那么该方法将抛出一个InterruptedException中断异常。
package com.rabbitmq.demo.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:23
 * @Copyright (C) ZhuYouBin
 * @Description: 消息生产者
 */
public class Producer {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接的 RabbitMQ 服务地址
        factory.setHost("127.0.0.1"); // 默认就是本机
        factory.setPort(5672); // 默认就是 5672 端口
        // 3、获取连接
        Connection connection = null; // 连接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、获取通道
            channel = connection.createChannel();
            // TODO 开启消息确认机制
            channel.confirmSelect();
            // 5、声明 Exchange,如果不存在,则会创建
            String exchangeName = "exchange_direct_2023";
            channel.exchangeDeclare(exchangeName, "direct");
            // 6、发送消息
            for (int i = 0; i < 10; i++) {
                // 路由键唯一标识
                String routingKey = "error";
                if (i % 3 == 0) {
                    routingKey = "info";
                } else if (i % 3 == 1) {
                    routingKey = "warn";
                }
                String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                // 等待RabbitMQ返回ACK标识
                boolean wait = channel.waitForConfirms();
                System.out.println("RabbitMQ是否接收成功: " + wait);
                if (!wait) {
                    // 消息发送失败,则可以重新发送
                    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != channel) {
                try {
                    channel.close();
                } catch (Exception e) {}
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (Exception e) {}
            }
        }
    }
}

(3)编写生产者【消息确认–批量确认】

  • 前一种方式,是一条消息就调用一次【waitForConfirms()】方法,阻塞等待RabbitMQ的ACK确认标识。
  • 但是这种方式是非常耗时的,当需要发送的消息非常多的时候,会严重影响系统性能,所以为了解决这个问题,提出了批量确认的方法。
  • 批量确认调用【waitForConfirmsOrDie()】方法,此时会等待一批消息的ACK确认标识,如果这一批消息中存在一个消息没有被RabbitMQ成功接收,此时该方法将抛出一个【IOException】异常。
  • 所以,可以通过捕获IOException异常来判断消息是否发送成功。
  • 这种方式的缺点:当一批消息出现失败的情况时候,我们没办法知道是哪一条消息失败了,只能够重新将这一批消息重新发送。
package com.rabbitmq.demo.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:23
 * @Copyright (C) ZhuYouBin
 * @Description: 消息生产者
 */
public class ProducerBatch {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接的 RabbitMQ 服务地址
        factory.setHost("127.0.0.1"); // 默认就是本机
        factory.setPort(5672); // 默认就是 5672 端口
        // 3、获取连接
        Connection connection = null; // 连接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、获取通道
            channel = connection.createChannel();
            // TODO 开启消息确认机制
            channel.confirmSelect();
            // 5、声明 Exchange,如果不存在,则会创建
            String exchangeName = "exchange_direct_2023";
            channel.exchangeDeclare(exchangeName, "direct");
            // 6、发送消息
            int batchSize = 3;
            int count = 0;
            for (int i = 0; i < 10; i++) {
                // 路由键唯一标识
                String routingKey = "error";
                if (i % 3 == 0) {
                    routingKey = "info";
                } else if (i % 3 == 1) {
                    routingKey = "warn";
                }
                String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                // 批量确认
                if (count == batchSize) {
                    // 等待RabbitMQ返回ACK标识
                    channel.waitForConfirmsOrDie();
                    count = 0;
                }
                count++;
            }
        } catch (IOException e) {
            System.out.println("消息发送失败啦");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != channel) {
                try {
                    channel.close();
                } catch (Exception e) {}
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (Exception e) {}
            }
        }
    }
}

(4)编写生产者【消息确认–异步确认】

  • 异步确认在消息发送之后,调用【addConfirmListener()】方法,该方法介绍两个参数,第一个参数是成功接收到ACK标识的回调方法,第二个参数是失败接收到NACK标识的回调方法。
  • 注意:一定要先调用【addConfirmListener()】监听方法,然后再发送消息,如果两者顺序反了,则监听方法不生效。
package com.rabbitmq.demo.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:23
 * @Copyright (C) ZhuYouBin
 * @Description: 消息生产者
 */
public class ProducerAsync {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接的 RabbitMQ 服务地址
        factory.setHost("127.0.0.1"); // 默认就是本机
        factory.setPort(5672); // 默认就是 5672 端口
        // 3、获取连接
        Connection connection = null; // 连接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、获取通道
            channel = connection.createChannel();
            // TODO 开启消息确认机制
            channel.confirmSelect();
            // 5、声明 Exchange,如果不存在,则会创建
            String exchangeName = "exchange_confirm_2023";
            channel.exchangeDeclare(exchangeName, "direct");
            // TODO 一定要先调用监听接口,在发送消息
            channel.addConfirmListener(new ConfirmCallback() {
                @Override
                public void handle(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("RabbitMQ接收成功啦.....消息的标识deliveryTag=" + deliveryTag 
                            + ",批量发送多条消息multiple=" + multiple);
                }
            }, new ConfirmCallback() {
                @Override
                public void handle(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("RabbitMQ接收失败啦.....");
                }
            });
            for (int i = 0; i < 10; i++) {
                // 6、发送消息
                String message = "这是发布确认模式,发送的消息数据";
                channel.basicPublish(exchangeName, "queue_confirm_2023", null, message.getBytes());
            }
        } catch (IOException e) {
            System.out.println("消息发送失败啦");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != channel) {
                try {
                    channel.close();
                } catch (Exception e) {}
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (Exception e) {}
            }
        }
    }
}

到此,RabbitMQ消息队列中的发布确认模式就介绍完啦。

综上,这篇文章结束了,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/134507.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之家——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!