从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

今日目标

  • 掌握如何解决 RabbitMQ 消息丢失

思考:在真正的生产环境中,允许,,MQ丢失数据?如果不允许如何避免MQ消息丢失?

本文将带小伙伴从消息的可靠性出发,解决消息不丢失的问题。

1.消息可靠性

昨天已经介绍个消息从发送,到消费者接收,会经历多个过程

从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!rabbitmq 整个消息投递的路径为:

producer--->exchange--->queue--->consumer

消息在每一步都有可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对丢失原因,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!下面我们就通过案例来演示每一个步骤。

如果您觉得本文不错,欢迎关注,点赞,收藏支持,您的关注是我坚持的动力!


2.生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm:确认模式

消息成功投递到Exchange(交换机),返回ack

消息未投递到Exchange(交换机),返回nack

  • publisher-return:回退模式

消息投递到Exchange(交换机),但是没有路由到Queue(队列)。返回ACK,及路由失败原因

从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

2.1. 实现publisher-confirm:发送者确认

定义ConfirmCallBack确认模式有两种方式

  • 1.添加全局ConfirmCallback配置


  • 2.ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同(推荐方式)


实现消息从publisher(生产者)到Exchange(交换机)消息确认

【步骤一】:修改mq-publisher服务application.yml文件

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 异步回调

配置说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

【步骤二】:在mq-publisher服务对rabbitTemplate定义全局ConfirmCallBack确认模式

1.在mq-publisher服务添加全局ConfirmCallback配置:

package com.zbbmeta.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

/**
 * @author springboot葵花宝典
 * @description: TODO
 */

@Slf4j
@Configuration
public class ConfirmCallbackConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //从容器中获取RabbitTemplate bean对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             *    //消息从生产者Publisher(生产者)---->Exchange(交换机)
             * @param correlationData correlationData 相关配置信息
             * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
             * @param cause cause 失败原因
             */

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    //Exchange(交换机)接收成功
                    System.out.println("全局ConfirmCallback接收成功");
                }else {
                    //Exchange(交换机)接收失败
                    System.err.println("全局ConfirmCallback消息接收失败: "+cause);
                }
            }
        });

    }
}

2.在mq-publisher服务的PublisherController类中添加接口方法测试全局ConfirmCallback:

/**
 *确认模式:
 * 步骤:
 * 
 * 1. 确认模式开启:在application.yml配置spring.rabbitmq. publisher-confirm-type  开启publisher-confirms="true"
 *      simple: 同步等待confirm结果,直到超时
 *      correlated 异步回调 推荐方式 
 * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
 * @param direct Exchange交换机名称
 * @param key    routingKey 路由
 * @param msg    message 消息
 * @return
 * @throws InterruptedException
 */

@GetMapping("/total/{direct}/{key}/{msg}")
public String sendMsgConfirmCallback(@PathVariable("direct") String direct,@PathVariable("key") String key,@PathVariable("msg") String msg)throws InterruptedException {
    rabbitTemplate.convertAndSend("zbbmeta.direct."+direct,key,msg);
    return msg;
}

【步骤三】:ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同(推荐方式)

步骤二和步骤三选一个即可

在mq-publisher服务的PublisherController类中添加接口方法进行测试局部ConfirmCallback

/**
 *确认模式:
 * 步骤:
 *
 * 1. 确认模式开启:在application.yml配置spring.rabbitmq. publisher-confirm-type  开启publisher-confirms="true"
 *      simple: 同步等待confirm结果,直到超时
 *      correlated 异步回调 推荐方式
 * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
 * @param direct Exchange交换机名称
 * @param key    routingKey 路由
 * @param msg    message 消息
 * @return
 * @throws InterruptedException
 */

@GetMapping("/{direct}/{key}/{msg}")
public String sendMsgCorrelationData(@PathVariable("direct") String direct,@PathVariable("key") String key,@PathVariable("msg") String msg)throws InterruptedException {

    //消息从生产者Publisher(生产者)---->Exchange(交换机)
    // 2.全局唯一的消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
            result -> {
                if(result.isAck()){
                    // ack,消息成功
                   log.debug("消息发送成功, ID:{}", correlationData.getId());
                    System.out.println("消息发送成功: ID: "+correlationData.getId());
                }else{
                    // 3.2 nack,消息失败
                    System.err.println("消息发送失败, ID: "+correlationData.getId()+" 原因: "+ result.getReason());
                }
            },
            ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );

    rabbitTemplate.convertAndSend("zbbmeta.direct."+direct,key,msg,correlationData);
    return msg;
}

【步骤四】:  mq-consumer 消息接收

声明一个zbbmeta.direct.confirms交换机绑定zbbmeta.confirms.taskqueue队列,对应routingKey为task

/**
 * 测试publisher-confirms
 *   1.声明zbbmeta.confirms.taskqueue 队列
 *   2.声明zbbmeta.direct.confirms交换机绑定
 *   3. routingkey为"task"
 * @param msg
 * @throws InterruptedException
 */

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "zbbmeta.confirms.taskqueue"),
        exchange = @Exchange(name = "zbbmeta.direct.confirms", type = ExchangeTypes.DIRECT),
        key = {"task"}
))
public void listenDirectTaskQueue(String msg) throws InterruptedException {

    System.out.println("确认模式  消费者........接收到zbbmeta.confirms.taskqueue消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

【步骤五】: 测试生产者确认

1.测试全局ConfirmCallback

1.启动mq-publishermq-consumer两个服务,发送消息到正确交换机和队列,

打开浏览器发送http://localhost:8888/publisher/total/confirms/task/hahah

表示发送消息(haha)到zbbmeta.direct.confirms交换机,routingKey路由为task

注意:zbbmeta.direct.confirms交换机存在

控制台结果:

从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!
  1. 发送消息到一个不存在交换机

打开浏览器发送http://localhost:8888/publisher/total/confirms11/task/hahah

表示发送消息(haha)到zbbmeta.direct.confirms11交换机,routingKey路由为task

注意:zbbmeta.direct.confirms11交换机不存在

控制台结果:

从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!
  1. 发送消息到一个不在交换机但是routingKey不存在

打开浏览器发送http://localhost:8888/publisher/total/confirms/task11/hahah

表示发送消息(haha)到zbbmeta.direct.confirms交换机,routingKey路由为task11

注意:zbbmeta.direct.confirms交换机存在但是路由task11不存在

控制台结果:

只要Exchange存在,无论routingKe存在与否ConfirmCallback都会表示确认成功从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

2.2. 定义ReturnCallback回退模式

回退模式:当消息发送给Exchange后,Exchange路由到Queue失败时才会执行 ReturnCallBack

步骤:

  • 1.开启回退模式
  • 2.设置ReturnCallBack
  • 3.设置Exchange处理消息的模式:
    • 1.如果消息没有路由到Queue,则丢弃消息 也就是不调用ReturnsCallback
    • 2.如果消息没有路由到Queue,返回给消息发送方调用ReturnCallBack (默认)

【步骤一】: 开启回退模式

开启回退模式 在application.yml 配置  设置spring.rabbitmq.publisher-returns = true

spring:
  rabbitmq:
    publisher-returns: true # 开启回退模式

【步骤二】: 设置ReturnCallBack

每个RabbitTemplate只能配置一个ReturnCallback,因此mq-publisher服务的com.zbbmeta.mq.config.ConfirmCallbackConfig类中添加ReturnCallBack

// 设置ReturnCallback
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        // 投递失败,记录日志
        System.err.println(
                "消息发送失败,应答码: "+   returned.getReplyCode()+
                ", 原因:  "+ returned.getReplyText()+
                ", 交换机: "+returned.getExchange()+
                ", 路由键: "+returned.getRoutingKey()+
                ", 消息: "+ returned.getMessage().toString()

        );
        // 如果有业务需要,可以重发消息
    }
});
从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

【步骤三】: 添加ReturnCallBack测试接口

在mq-publisher服务的PublisherController类中添加接口方法进行测试eturnCallback

/**
 * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack
 * 步骤:
 *  1.开启回退模式 在application.yml 配置  设置spring.rabbitmq. publisher-returns = true
 *  2. 设置ReturnCallBack
 *  3. 设置Exchange处理消息的模式:
 *      3.1. 如果消息没有路由到Queue,则丢弃消息
 *      3.1. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
 *          在application.yml 设置spring.rabbitmq.template.mandatory=true (默认)
 *
 *
 * @param direct
 * @param key
 * @param msg
 * @return
 * @throws InterruptedException
 */

@GetMapping("/returncallback/{direct}/{key}/{msg}")
public String sendMsgReturnCallback(@PathVariable("direct") String direct,@PathVariable("key") String key,@PathVariable("msg") String msg)throws InterruptedException {
    rabbitTemplate.convertAndSend("zbbmeta.direct."+direct,key,msg);
    return msg;
}

【步骤四】: 测试ReturnCallBack

需求:

发送一个请求到ReturnCallBack测试接口,要求Exchange(交换机存在),但是Queue(d队列不存在)

  • 1.打开浏览器http://localhost:8888/publisher/returncallback/confirms/task11/hahah

结果展示:

从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!
  • 2.测试消息没有路由到Queue,则丢弃消息,那么就不会调用ReturnsCallback


在application.yml进行设置

spring:
  rabbitmq:
    publisher-returns: true  # 开启回退模式
    template:
      mandatory: 
false
      
# false 消息没有路由到Queue,则丢弃消息,也就是不调用ReturnsCallback
      
# true  消息没有路由到Queue,返回给消息发送方调用ReturnCallBack
      

2.3 生产者消息确认总结

publish-confirm-type确认的是消息从publish(生产者)–>Exchange(交换机)

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

publish-returns确认的是消息从Exchange(交换机)–>Queue(队列)

  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

3. 消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制

  • 交换机持久化
  • 队列持久化
  • 消息持久化

3.1. 交换机持久化

SpringAMQP可以通过代码指定交换机持久化,默认情况下,由SpringAMQP声明的交换机都是持久化的。可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示

  • 1.SpringAMQP中可以通过代码指定交换机持久化
@Bean
public DirectExchange simpleExchange(){
    // 三个参数:参数一:交换机名称、
    //          参数二:true持久化、false 当没有queue与其绑定时是否自动删除
    //          参数三: 如果服务器应在exchange不再使用时删除它,则为true
    return new DirectExchange("simple.direct"truefalse);
}
  • 2.RabbitMQ控制台看到持久化的交换机都会带上D的标示
从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

3.2. 队列持久化

SpringAMQP可以通过代码指定交换机持久化,默认情况下,由SpringAMQP声明的队列都是持久化的。可以在RabbitMQ控制台看到持久化的队列都会带上D的标示

  • 1.SpringAMQP中可以通过代码指定交换机持久化
@Bean
public DirectExchange simpleExchange(){
    // 三个参数:参数一:交换机名称、
    //          参数二:true持久化、false 当没有queue与其绑定时是否自动删除
    //          参数三: 如果服务器应在exchange不再使用时删除它,则为true
    return new DirectExchange("simple.direct"truefalse);
}
  • 2.RabbitMQ控制台看到持久化的队列都会带上D的标示
从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

3.3. 消息持久化

SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode

  • 1.非持久化


  • 2.持久化


默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定

代码实现:

@GetMapping("/durable/{msg}")
public String sendMsgDurableMessage(@PathVariable("msg") String msg) {

    //创建消息,将消息封装到Message
    Message message = MessageBuilder
            .withBody(msg.getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .build();
    rabbitTemplate.convertAndSend("zbbmeta.direct.confirms","task",message);
    return msg;
}

4. 消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。假设这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

4.1. 演示none模式

none模式下,消息投递是不可靠的,可能丢失

【步骤一】:修改mq-consumer服务的application.yml文件,添加下面内容:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 关闭ack ,消息投递后立即被删除

【步骤二】:在mq-consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {

    System.err.println("消费者接收到simple.queue的消息:"+msg);
    // 模拟异常
    System.out.println(1 / 0);
    System.err.println("消息处理完成!");
}

【步骤三】:在mq-publisher服务的PublisherController类添加sendMsgHandleMessage接口

@GetMapping("/handle/{msg}")
public String sendMsgHandleMessage(@PathVariable("msg") String msg) {

    rabbitTemplate.convertAndSend("simple.queue",msg);
    return msg;
}

【步骤四】:测试

  • 打开浏览器发送消息到simple.queue, 输入地址:http://localhost:8888/publisher/handle/hahha111

结果:

  • 1.IDEA控制台抛出错误内容
从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!
  • 2.simple.queue队列中消息已经被消费

  • 从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

4.2. 演示auto模式

auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack

【步骤一】:再次修改mq-consumer服务的application.yml文件,添加下面内容:

把确认机制修改为auto

spring:
  rabbitmq:
    listener:
      simple:
    # auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
        acknowledge-mode: auto 

【步骤二】:测试

  • 打开浏览器发送消息到simple.queue, 输入地址:http://localhost:8888/publisher/handle/hahha111

  • 1.在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态)结果:

从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!
  • 2.抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除
从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

4.3. 消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力

从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

思考:如何解决消息异常时,无限循环?

4.3.1. 本地重试

可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息 
    结论:
  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃

4.3.2. 失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重新尝试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重新尝试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重新尝试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理

  • 1)在mq-consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue"true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

  • 2)在mq-consumer服务中定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct""error");
}

4.3.3. 测试失败策略


    1. mq-consumer控制台结果
从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

    1. RabbitMQ控制台结果,将消息传递到error.queue从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

ErrorMessageConfig完整代码:

package com.zbbmeta.mq.config;

/**
 * @author springboot葵花宝典
 * @description: TODO
 */

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue"true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct""error");
    }
}

4.4. 演示manual模式

manual:自己根据业务情况,判断什么时候该ack

【步骤一】:再次修改mq-consumer服务的application.yml文件,添加下面内容:

把确认机制修改为manual

spring:
  rabbitmq:
    listener:
      simple:
    # auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
        acknowledge-mode: manual 

【步骤二】:在mq-consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:

@RabbitListener(queues = "simple.queue2")
public void listenSimpleQueue2(Message message, Channel channel) throws IOException {

    try {
        // 处理消息的业务逻辑
        System.out.println(1 / 0);

        // 手动确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        System.err.println("消费者接收到simple.queue的消息:"+message);
        // 发生异常时,将消息重新入队列
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);
        // 或者使用 channel.basicReject() 方法拒绝消息,并不重新入队
        // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }
}

【步骤三】:在mq-publisher服务的PublisherController类添加sendMsgManualHandleMessage接口

@GetMapping("/manual/{msg}")
public String sendMsgManualHandleMessage(@PathVariable("msg") String msg) {

    rabbitTemplate.convertAndSend("simple.queue2",msg);
    return msg;
}

【步骤四】:测试

  • 重新启动服务,,然后打开浏览器发送消息到simple.queue2, 输入地址:http://localhost:8888/publisher/manual/hahha111

结果:

从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

总结如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

如果您觉得本文不错,欢迎关注,点赞,收藏支持,您的关注是我坚持的动力!

原创不易,转载请注明出处,感谢支持!如果本文对您有用,欢迎转发分享!


原文始发于微信公众号(springboot葵花宝典):从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/183102.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之家——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!