SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制)

导读:本篇文章讲解 SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1、环境搭建

引入pom:

<!-- rabbitMQ -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置yaml:

server:
  port: 9090
  servlet:
    context-path: /rabbit
spring:
  application:
    name: rabbit
  # rabbitmq配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    # 消息开启手动确认
    listener:
      direct:
        acknowledge-mode: manual

常量类配置:
配置使用过程中可能用到的常量

public class RabbitmqContext {

    /**
     * 工作队列模式
     */
    public static String QUEUE_WORK = "queue_work";


    /**
     * 订阅发布模式
     */
    public static String QUEUE_FANOUT_ONE = "queue_fanout_one";
    public static String QUEUE_FANOUT_TWO = "queue_fanout_two";
    public static String EXCHANGE_FANOUT = "exchange_fanout";

    /**
     * 路由模式
     */
    public static String QUEUE_DIRECT_ONE = "queue_direct_one";
    public static String QUEUE_DIRECT_TWO = "queue_direct_two";
    public static String EXCHANGE_DIRECT = "exchange_direct";
    public static String ROUTING_DIRECT_ONE = "routing_direct_one";
    public static String ROUTING_DIRECT_TWO = "routing_direct_two";
    public static String ROUTING_DIRECT_THREE = "routing_direct_three";

    /**
     * 主题模式
     */
    public static String QUEUE_TOPIC_ONE = "queue_topic_one";
    public static String QUEUE_TOPIC_TWO = "queue_topic_two";
    public static String EXCHANGE_TOPIC = "exchange_topic";
    public static String ROUTING_TOPIC_ONE = "topic.one";
    public static String ROUTING_TOPIC_TWO = "topic.one.two";

    /**
     * 手动确认机制演示
     */
    public static String QUEUE_ACK = "queue_ack";

    /**
     * 延时队列模式
     */
    public static String QUEUE_DELAY = "delay_queue";
    public static String EXCHANGE_DELAY = "delay_exchange";
    public static String ROUTING_DELAY = "delay";
}

消息监听与配置:
整合SpringBoot后,通过RabbitListener监听器与Config配置实现消息的监听(消费);

消息发送:

// 引入RabbitTemplate 
@Resource
private RabbitTemplate rabbitTemplate;

// 发送消息
rabbitTemplate.convertAndSend(String routingKey, Object object);
rabbitTemplate.convertSendAndReceive(Object object);

发送消息时,通过**rabbitTemplate.convertAndSend()或者rabbitTemplate.convertSendAndReceive()**方法进行发送,区别在于:

  • convertAndSend: 消息没有顺序,不管是否消费者是否确认,会一直发送消息;
  • convertSendAndReceive: 按照一定的顺序,只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有间隔时间;

2、队列模式

Controller:

@RestController
public class RabbitmqController {  
    @Resource
    private RabbitTemplate rabbitTemplate;
    
	@PostMapping("/sendWork")
    public Object sendWordQueue() {
        String msg = "工作模式消息,ID:";
        // 绑定队列发送消息
        for (int i = 1; i <= 10; i++) {
            String sendMsg = msg + i;
            rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_WORK, sendMsg);
        }
        return "发送成功...";
    }
}

Config:

@Configuration
public class WorkConfig {
    /**
     * 配置队列名称 - 工作模式
     *
     * @return
     */
    @Bean
    public Queue queueWork() {
        return new Queue(RabbitmqContext.QUEUE_WORK);
    }
}

Listener:

@Component
@Slf4j
public class WorkReceiveListener {

    /**
     * @param msg     接收的文本消息
     * @param channel 通道信息
     * @param message 附加的参数信息
     */
    @RabbitListener(queues = "queue_work")
    public void receiveQueueWork1(String msg, Channel channel, Message message) {
        log.info("消费者01-接收到消息:" + msg);
    }


    @RabbitListener(queues = "queue_work")
    public void receiveQueueWork2(String msg, Channel channel, Message message) {
        log.info("消费者02-接收到消息:" + msg);
    }
}

结果:
在这里插入图片描述

3、发布订阅模式

Controller:

@RestController
public class RabbitmqController {  
    @Resource
    private RabbitTemplate rabbitTemplate;
    
	@PostMapping("/sendFanout")
    public String sendFanout() {
        String msg = "发布|订阅模式消息,ID:";
        for (int i = 1; i <= 10; i++) {
            String sendMsg = msg + i;
            // 绑定交换机发送消息,路由key为 ""
            rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_FANOUT, "", sendMsg);
        }
        return "发送成功...";
    }
}    

Config:

@Configuration
public class FanoutConfig {

    /**
     * 配置队列名称 - 发布订阅模式
     *
     * @return
     */
    @Bean
    public Queue queueFanoutOne() {
        return new Queue(RabbitmqContext.QUEUE_FANOUT_ONE);
    }

    @Bean
    public Queue queueFanoutTwo() {
        return new Queue(RabbitmqContext.QUEUE_FANOUT_TWO);
    }

    /**
     * 定义FanoutExchange类型的交换机
     *
     * @return
     */
    @Bean
    public FanoutExchange exchangeFanout() {
        return new FanoutExchange(RabbitmqContext.EXCHANGE_FANOUT);
    }

    /**
     * 将交换机和队列进行绑定
     *
     * @return
     */
    @Bean
    public Binding bindingExchangeOne() {
        return BindingBuilder.bind(queueFanoutOne()).to(exchangeFanout());
    }

    @Bean
    public Binding bindingExchangeTwo() {
        return BindingBuilder.bind(queueFanoutTwo()).to(exchangeFanout());
    }
}

Listener:

@Component
@Slf4j
public class FanoutReceiveListener {
    /**
     * @param msg     接收的文本消息
     * @param channel 通道信息
     * @param message 附加的参数信息
     */

    @RabbitListener(queues = "queue_fanout_one")
    public void consumerOne1(String msg, Channel channel, Message message) {
        System.out.println("queue_fanout_one队列 消费者1:收到消息:" + msg);
    }

    @RabbitListener(queues = "queue_fanout_one")
    public void consumerOne2(String msg, Channel channel, Message message) {
        System.out.println("queue_fanout_one队列 消费者2:收到消息:" + msg);
    }

    /**
     * -------------一个队列绑定两个消费者 --------------------------------
     */
    @RabbitListener(queues = "queue_fanout_two")
    public void consumerTwo1(String msg, Channel channel, Message message) {
        System.out.println("queue_fanout_two队列 消费者1:收到消息:" + msg);
    }

    @RabbitListener(queues = "queue_fanout_two")
    public void consumerTwo2(String msg, Channel channel, Message message) {
        System.out.println("queue_fanout_two队列 消费者2:收到消息:" + msg);
    }
}

结果:
在这里插入图片描述

4、路由模式

路由模式与发布订阅模式相同,就是定义了路由,在将队列与交换机绑定时 以及 发送消息时设置路由名称
Controller:

@RestController
public class RabbitmqController {  
    @Resource
    private RabbitTemplate rabbitTemplate;
    
	@PostMapping("/sendDirect")
    public String sendDirect() {
        String msg = "路由模式消息,ID:";
        for (int i = 1; i <= 12; i++) {
            String sendMsg = null;
            String routingKey = "";
            if (i % 2 == 0) {
                sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_TWO;
                routingKey = RabbitmqContext.ROUTING_DIRECT_TWO;
            } else if (i % 3 == 0) {
                sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_THREE;
                routingKey = RabbitmqContext.ROUTING_DIRECT_THREE;
            } else {
                sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_ONE;
                routingKey = RabbitmqContext.ROUTING_DIRECT_ONE;
            }
            // 绑定交换机,并且设置 路由Key 发送消息
            rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_DIRECT, routingKey, sendMsg);
        }
        return "发送成功...";
    }
}

Config:

@Configuration
public class DirectConfig {

    /**
     * 队列一
     *
     * @return
     */
    @Bean
    public Queue directQueueOne() {
        return new Queue(RabbitmqContext.QUEUE_DIRECT_ONE);
    }

    /**
     * 队列二
     *
     * @return
     */
    @Bean
    public Queue directQueueTwo() {
        return new Queue(RabbitmqContext.QUEUE_DIRECT_TWO);
    }

    /**
     * 定义交换机 direct类型
     *
     * @return
     */
    @Bean
    public DirectExchange myDirectExchange() {
        return new DirectExchange(RabbitmqContext.EXCHANGE_DIRECT);
    }

    /**
     * 队列 绑定到交换机 再指定一个路由键
     * directQueueOne() 会找到上方定义的队列bean
     *
     * @return
     */
    @Bean
    public Binding directExchangeOne() {
        return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_ONE);
    }

    @Bean
    public Binding directExchangeTwo() {
        return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_TWO);
    }

    /**
     * 队列 绑定到交换机 再指定一个路由键
     *
     * @return
     */
    @Bean
    public Binding directExchangeThree() {
        return BindingBuilder.bind(directQueueTwo()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_THREE);
    }
}

Listener:

@Component
@Slf4j
public class DirectReceiveListener {

    /**
     * @param msg     接收的文本消息
     * @param channel 通道信息
     * @param message 附加的参数信息
     */
    @RabbitListener(queues = "queue_direct_one")
    public void consumerOne1(String msg, Channel channel, Message message) {
        System.out.println("queue_direct_one队列 消费者1:收到消息:" + msg);
    }

    @RabbitListener(queues = "queue_direct_one")
    public void consumerTwo(String msg, Channel channel, Message message) {
        System.out.println("queue_direct_one队列 消费者2:收到消息:" + msg);
    }


    @RabbitListener(queues = "queue_direct_two")
    public void consumerOne(String msg, Channel channel, Message message) {
        System.out.println("queue_direct_two队列 消费者1:收到消息:" + msg);
    }

    @RabbitListener(queues = "queue_direct_two")
    public void consumerDirect(String msg, Channel channel, Message message) {
        System.out.println("queue_direct_two队列 消费者2:收到消息:" + msg);
    }
}

结果:
在这里插入图片描述

5、主题模式

与路由模式的区别在于:可以通过不同规则匹配多个路由;

Controller:

@RestController
public class RabbitmqController {  
    @Resource
    private RabbitTemplate rabbitTemplate;
    
	@PostMapping("/sendTopic")
    public String sendTopic() {
        String msg = "消息ID:";
        for (int i = 1; i <= 10; i++) {
            String sendMsg = null;
            String routingKey = "";
            if (i % 2 == 0) {
                sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_TWO;
                routingKey = RabbitmqContext.ROUTING_TOPIC_TWO;
            } else {
                sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_ONE;
                routingKey = RabbitmqContext.ROUTING_TOPIC_ONE;
            }
            // 绑定交换机,并且设置 路由Key 发送消息
            rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_TOPIC, routingKey, sendMsg);
        }
        return "发送成功...";
    }
}

Config:

@Configuration
public class TopicConfig {
    /**
     * 队列定义
     *
     * @return
     */
    @Bean
    public Queue topicQueueOne() {
        return new Queue(RabbitmqContext.QUEUE_TOPIC_ONE);
    }

    /**
     * 队列定义
     *
     * @return
     */
    @Bean
    public Queue topicQueueTwo() {
        return new Queue(RabbitmqContext.QUEUE_TOPIC_TWO);
    }

    /**
     * 定义 TopicExchange 类型交换机
     *
     * @return
     */
    @Bean
    public TopicExchange exchangeTopic() {
        return new TopicExchange(RabbitmqContext.EXCHANGE_TOPIC);
    }

    /**
     * 队列一绑定到交换机 且设置路由键为 topic.#
     *
     * @return
     */
    @Bean
    public Binding bindingTopic1() {
        return BindingBuilder.bind(topicQueueOne()).to(exchangeTopic()).with("topic.#");
    }

    /**
     * 队列二绑定到交换机 且设置路由键为 topic.*
     *
     * @return
     */
    @Bean
    public Binding bindingTopic2() {
        return BindingBuilder.bind(topicQueueTwo()).to(exchangeTopic()).with("topic.*");
    }
}

Listener:

@Component
@Slf4j
public class TopicReceiveListener {

    /**
     * @param msg     接收的文本消息
     * @param channel 通道信息
     * @param message 附加的参数信息
     */
    @RabbitListener(queues = "queue_topic_one")
    public void listenOne1(String msg, Channel channel, Message message) {
        System.out.println("queue_topic_one队列 消费者1,路由匹配:topic.#,收到消息:" + msg);
    }

    @RabbitListener(queues = "queue_topic_one")
    public void listenOne2(String msg, Channel channel, Message message) {
        System.out.println("queue_topic_one队列 消费者2,路由匹配:topic.#,收到消息:" + msg);
    }

    @RabbitListener(queues = "queue_topic_two")
    public void listenTwo1(String msg, Channel channel, Message message) {
        System.out.println("queue_topic_two队列 消费者1,路由匹配:topic.*,收到消息:" + msg);
    }

    @RabbitListener(queues = "queue_topic_two")
    public void listenTwo2(String msg, Channel channel, Message message) {
        System.out.println("queue_topic_two队列 消费者2,路由匹配:topic.*,收到消息:" + msg);
    }
}

结果:
在这里插入图片描述

6、消息手动应答机制

说明:
通过配置自定义的SimpleRabbitListenerContainerFactory,并且在监听器中通过@RabbitListener注解通过containerFactory属性设置的SimpleRabbitListenerContainerFactory,比如:@RabbitListener(containerFactory = “listenerContainerFactory”)

yml配置:
在原有的yaml基础上追加,以下配置:

spring:
  rabbitmq:
    # 确认消息已发送到队列 return
    publisher-returns: true
    # 开启消息确认机制 confirm 异步
    publisher-confirm-type: correlated
    listener:
      direct:
        # 消息开启手动确认
        acknowledge-mode: manual
        # 拒绝消息是否重回队列
        default-requeue-rejected: true

配置类:

@Configuration
@Slf4j
public class RabbitConfig {
	@Bean("listenerContainerFactory")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 如果需要批量确认消息,则做以下设置
        // 设置批量
        // factory.setBatchListener(true);
        // 设置BatchMessageListener生效
        // factory.setConsumerBatchEnabled(true);
        // 设置批量确认数量
        // factory.setBatchSize(10);

        //设置消息为手动确认
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        return factory;
    }
}

Controller:

@RestController
public class RabbitmqController {  
    @Resource
    private RabbitTemplate rabbitTemplate;
    
	@PostMapping("/sendAck")
    public String sendAck() {
        Object msg = "这是手动确认机制的消息";
        rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_ACK, msg);
        return "发送成功...";
    }
}

Config:

@Configuration
public class AckConfig {


    /**
     * 配置队列名称 - 工作模式
     *
     * @return
     */
    @Bean
    public Queue queueConfirm() {
        return new Queue(RabbitmqContext.QUEUE_ACK);
    }
}

Listener:

@Component
@Slf4j
public class AckReceiveListener {

    /**
     * @param msg     接收的文本消息
     * @param channel 通道信息
     * @param message 附加的参数信息
     */
    @RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")
    public void queueAck(String msg, Channel channel, Message message) {
        try {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 批量手动确认机制
     *
     * @param messages
     * @param channel
     */
//    @RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")
//    public void queueAck(List<Message> messages, Channel channel) {
//        try {
//            for (Message message : messages) {
//                long deliveryTag = message.getMessageProperties().getDeliveryTag();
//                String msg = new String(message.getBody(), StandardCharsets.UTF_8);
//                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//                System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg);
//                channel.basicAck(deliveryTag, false);
//            }
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
//    }
}

7、回调函数-确认机制(发布确认模式)

通过配置消息确认机制,可以监听到:消息发送的情况,消息接收的情况;

yaml:
在原有yaml的基础上,追加以下配置:

spring:
    rabbitmq:
    # 开启消息回退 return
    publisher-returns: true
    # 开启消息确认机制 confirm 异步
    publisher-confirm-type: correlated

配置类:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: LiHuaZhi
 * @Date: 2021/10/24 23:06
 * @Description:
 **/
@Configuration
@Slf4j
public class RabbitConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        /*
         * 触发条件:
         * 1、消息推送到server,但是在server里找不到交换机
         * 2、消息推送到server,找到交换机了,但是没找到队列
         * 3、消息推送到sever,交换机和队列啥都没找到
         * 4、消息推送成功
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("ConfirmCallback-------消息成功推送到MQ");
            } else {
                log.error("ConfirmCallback------消息推送到MQ失败,原因:{}", cause);
            }
        });

        /*
         * 触发条件:
         * 1、消息推送到server,找到交换机了,但是没找到队列,被交换机退回;
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}", new
                    String(message.getBody()), exchange, replyText, routingKey);
        });
        return rabbitTemplate;
    }
}

使用:
当消息发送后,会自动回调RabbitConfig配置类中的rabbitTemplate.setConfirmCallback方法;

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/18139.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!