RabbitMQ中限流操作

导读:本篇文章讲解 RabbitMQ中限流操作,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

在RabbitMQ中为什么会有限流这种操作的存在呢?

想一下,如果生产端投递量几十万条数据,假如这些消息在队列里堆积了。当消费端去消费的时候瞬间涌上来,数据量那么大,消费端一时承受不了,很大可能会垮掉。因此为了减轻压力,需要使用限流操作。

那么,应该怎样实现限流操作呢?

服务端设置接收broker每次推送消息的个数,设置ack手动签收,当消费端处理完当次推送的消息后,broker会再次推送一定个数的消息。如果没处理完,就不让broker推送消息,直至消费端成功签收。

生产端:

package com.wy.testrabbitmq.xianliu;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author wangyan@163.com
 * @version 1.0
 * @date 2019-06-12 17:20
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_xianliu_exchange";
        String routingkey = "test.xianliu";
        String msg = "test xainliu message";
        for (int i = 0; i < 3; i++) {
            channel.basicPublish(exchangeName, routingkey, true, null, msg.getBytes());
        }
    }
}

消费端:

package com.wy.testrabbitmq.xianliu;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author wangyan@163.com
 * @version 1.0
 * @date 2019-06-12 17:21
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_xianliu_exchange";
        String routingkey = "test.#";
        String quequName = "test_xianliu_queueName";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(quequName, true, false, false, null);
        channel.queueBind(quequName, exchangeName, routingkey);
        // prefetchSize=0表示:消息大小是否限制(听说rabbitMQ没实现这个)
        // prefetchCount=1表示:broker一次性推送给消费端消息个数不大于1
        // global=true/false:表示限制是在channel上还是在consumer上(听说rabbitMQ没实现这个)
        channel.basicQos(0, 1, false);
        //使用限流必须设置ack手动签收
        channel.basicConsume(quequName, false, new TestConsumer(channel));

    }

}

监听类:

这个很关键:channel.basicAck(envelope.getDeliveryTag(), false);

第一个参数表示:该消息的index,仔细看你会发现这个deliveryTag是自增的。

第二个参数表示:true/false 是否批量处理(一般来说qos都是定义一次签收一个消息设置为false,如果你定义的是多个,这里可以写成true。定义为true时会把小于deliveryTag的所有消息都ack,所有小于deliveryTag的消息其实就是你qos中限制的个数。)

package com.wy.testrabbitmq.xianliu;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

/**
 * 自定义消费端监听类
 *
 * @author wangyan@163.com
 * @version 1.0
 * @date 2019-06-18 15:31
 */
public class TestConsumer extends DefaultConsumer {
    Channel channel;

    public TestConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
        System.out.println("s:" + s);
        System.out.println("envelope:" + envelope);
        System.out.println("basicProperties:" + basicProperties);
        System.out.println("body:" + new String(body));

        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}
测试:
  • 注释掉:channel.basicAck(envelope.getDeliveryTag(), false);

由于我定义的是一次推送一个消息,如下图可以看到生产端总共有3条消息,1条在处理,剩下的2条在等待,等这一条处理完再推送。
在这里插入图片描述

  • 去掉注释:可以看到,依次都签收了。
    在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/115810.html

(0)
Java光头强的头像Java光头强

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!