从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!
今日目标
掌握如何解决 RabbitMQ 消息丢失
思考:在真正的生产环境中,允许,,MQ丢失数据?如果不允许如何避免MQ消息丢失?
本文将带小伙伴从消息的可靠性出发,解决消息不丢失的问题。
1.消息可靠性
昨天已经介绍个消息从发送,到消费者接收,会经历多个过程:
producer--->exchange--->queue--->consumer
消息在每一步都有可能导致消息丢失,常见的丢失原因包括:
-
发送时丢失: -
生产者发送的消息未送达exchange -
消息到达exchange后未到达queue -
MQ宕机,queue将消息丢失 -
consumer接收到消息后未消费就宕机
针对丢失原因,RabbitMQ分别给出了解决方案:
如果您觉得本文不错,欢迎关注,点赞,收藏支持,您的关注是我坚持的动力!
2.生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
-
publisher-confirm:确认模式
消息成功投递到Exchange(交换机),返回ack
消息未投递到Exchange(交换机),返回nack
-
publisher-return:回退模式
消息投递到Exchange(交换机),但是没有路由到Queue(队列)。返回ACK,及路由失败原因

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
2.1. 实现publisher-confirm:发送者确认
定义ConfirmCallBack确认模式有两种方式:
-
1.添加全局ConfirmCallback配置
-
2.ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同(推荐方式)
实现消息从publisher(生产者)到Exchange(交换机)消息确认
【步骤一】:修改mq-publisher服务application.yml文件
spring:
rabbitmq:
publisher-confirm-type: correlated # 异步回调
配置说明:
-
publish-confirm-type:开启publisher-confirm,这里支持两种类型: -
simple:同步等待confirm结果,直到超时 -
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
【步骤二】:在mq-publisher服务对rabbitTemplate定义全局ConfirmCallBack确认模式
1.在mq-publisher服务添加全局ConfirmCallback配置:
package com.zbbmeta.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
/**
* @author springboot葵花宝典
* @description: TODO
*/
@Slf4j
@Configuration
public class ConfirmCallbackConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//从容器中获取RabbitTemplate bean对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* //消息从生产者Publisher(生产者)---->Exchange(交换机)
* @param correlationData correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
//Exchange(交换机)接收成功
System.out.println("全局ConfirmCallback接收成功");
}else {
//Exchange(交换机)接收失败
System.err.println("全局ConfirmCallback消息接收失败: "+cause);
}
}
});
}
}
2.在mq-publisher服务的PublisherController类中添加接口方法测试全局ConfirmCallback:
/**
*确认模式:
* 步骤:
*
* 1. 确认模式开启:在application.yml配置spring.rabbitmq. publisher-confirm-type 开启publisher-confirms="true"
* simple: 同步等待confirm结果,直到超时
* correlated 异步回调 推荐方式
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
* @param direct Exchange交换机名称
* @param key routingKey 路由
* @param msg message 消息
* @return
* @throws InterruptedException
*/
@GetMapping("/total/{direct}/{key}/{msg}")
public String sendMsgConfirmCallback(@PathVariable("direct") String direct,@PathVariable("key") String key,@PathVariable("msg") String msg)throws InterruptedException {
rabbitTemplate.convertAndSend("zbbmeta.direct."+direct,key,msg);
return msg;
}
【步骤三】:ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同(推荐方式)
步骤二和步骤三选一个即可
在mq-publisher服务的PublisherController类中添加接口方法进行测试局部ConfirmCallback
/**
*确认模式:
* 步骤:
*
* 1. 确认模式开启:在application.yml配置spring.rabbitmq. publisher-confirm-type 开启publisher-confirms="true"
* simple: 同步等待confirm结果,直到超时
* correlated 异步回调 推荐方式
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
* @param direct Exchange交换机名称
* @param key routingKey 路由
* @param msg message 消息
* @return
* @throws InterruptedException
*/
@GetMapping("/{direct}/{key}/{msg}")
public String sendMsgCorrelationData(@PathVariable("direct") String direct,@PathVariable("key") String key,@PathVariable("msg") String msg)throws InterruptedException {
//消息从生产者Publisher(生产者)---->Exchange(交换机)
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// ack,消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
System.out.println("消息发送成功: ID: "+correlationData.getId());
}else{
// 3.2 nack,消息失败
System.err.println("消息发送失败, ID: "+correlationData.getId()+" 原因: "+ result.getReason());
}
},
ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
rabbitTemplate.convertAndSend("zbbmeta.direct."+direct,key,msg,correlationData);
return msg;
}
【步骤四】: mq-consumer 消息接收
声明一个
zbbmeta.direct.confirms
交换机绑定zbbmeta.confirms.taskqueue
队列,对应routingKey为task
/**
* 测试publisher-confirms
* 1.声明zbbmeta.confirms.taskqueue 队列
* 2.声明zbbmeta.direct.confirms交换机绑定
* 3. routingkey为"task"
* @param msg
* @throws InterruptedException
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "zbbmeta.confirms.taskqueue"),
exchange = @Exchange(name = "zbbmeta.direct.confirms", type = ExchangeTypes.DIRECT),
key = {"task"}
))
public void listenDirectTaskQueue(String msg) throws InterruptedException {
System.out.println("确认模式 消费者........接收到zbbmeta.confirms.taskqueue消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
【步骤五】: 测试生产者确认
1.测试全局ConfirmCallback
1.启动
mq-publisher
和mq-consumer
两个服务,发送消息到正确交换机和队列,
打开浏览器发送http://localhost:8888/publisher/total/confirms/task/hahah
表示发送消息(haha)到zbbmeta.direct.confirms
交换机,routingKey路由为task
注意:zbbmeta.direct.confirms交换机存在
控制台结果:

发送消息到一个不存在交换机
打开浏览器发送http://localhost:8888/publisher/total/confirms11/task/hahah
表示发送消息(haha)到zbbmeta.direct.confirms11
交换机,routingKey路由为task
注意:zbbmeta.direct.confirms11交换机不存在
控制台结果:

发送消息到一个不在交换机但是routingKey不存在
打开浏览器发送http://localhost:8888/publisher/total/confirms/task11/hahah
表示发送消息(haha)到zbbmeta.direct.confirms
交换机,routingKey路由为task11
注意:zbbmeta.direct.confirms交换机存在但是路由task11不存在
控制台结果:
只要Exchange存在,无论routingKe存在与否ConfirmCallback都会表示确认成功
2.2. 定义ReturnCallback回退模式
回退模式:当消息发送给Exchange后,Exchange路由到Queue失败时才会执行 ReturnCallBack
步骤:
-
1.开启回退模式 -
2.设置ReturnCallBack -
3.设置Exchange处理消息的模式: -
1.如果消息没有路由到Queue,则丢弃消息 也就是不调用ReturnsCallback -
2.如果消息没有路由到Queue,返回给消息发送方调用ReturnCallBack (默认)
【步骤一】: 开启回退模式
开启回退模式 在application.yml 配置 设置spring.rabbitmq.publisher-returns = true
spring:
rabbitmq:
publisher-returns: true # 开启回退模式
【步骤二】: 设置ReturnCallBack
每个RabbitTemplate只能配置一个ReturnCallback,因此mq-publisher服务的
com.zbbmeta.mq.config.ConfirmCallbackConfig
类中添加ReturnCallBack:
// 设置ReturnCallback
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
// 投递失败,记录日志
System.err.println(
"消息发送失败,应答码: "+ returned.getReplyCode()+
", 原因: "+ returned.getReplyText()+
", 交换机: "+returned.getExchange()+
", 路由键: "+returned.getRoutingKey()+
", 消息: "+ returned.getMessage().toString()
);
// 如果有业务需要,可以重发消息
}
});

【步骤三】: 添加ReturnCallBack测试接口
在mq-publisher服务的PublisherController类中添加接口方法进行测试eturnCallback
/**
* 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack
* 步骤:
* 1.开启回退模式 在application.yml 配置 设置spring.rabbitmq. publisher-returns = true
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式:
* 3.1. 如果消息没有路由到Queue,则丢弃消息
* 3.1. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
* 在application.yml 设置spring.rabbitmq.template.mandatory=true (默认)
*
*
* @param direct
* @param key
* @param msg
* @return
* @throws InterruptedException
*/
@GetMapping("/returncallback/{direct}/{key}/{msg}")
public String sendMsgReturnCallback(@PathVariable("direct") String direct,@PathVariable("key") String key,@PathVariable("msg") String msg)throws InterruptedException {
rabbitTemplate.convertAndSend("zbbmeta.direct."+direct,key,msg);
return msg;
}
【步骤四】: 测试ReturnCallBack
需求:
发送一个请求到ReturnCallBack测试接口,要求Exchange(交换机存在),但是Queue(d队列不存在)
-
1.打开浏览器http://localhost:8888/publisher/returncallback/confirms/task11/hahah
结果展示:

-
2.测试消息没有路由到Queue,则丢弃消息,那么就不会调用ReturnsCallback
在application.yml进行设置
spring:
rabbitmq:
publisher-returns: true # 开启回退模式
template:
mandatory: false
# false 消息没有路由到Queue,则丢弃消息,也就是不调用ReturnsCallback
# true 消息没有路由到Queue,返回给消息发送方调用ReturnCallBack
2.3 生产者消息确认总结
publish-confirm-type确认的是消息从publish(生产者)–>Exchange(交换机)
-
publish-confirm-type:开启publisher-confirm,这里支持两种类型: -
simple:同步等待confirm结果,直到超时 -
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publish-returns确认的是消息从Exchange(交换机)–>Queue(队列)
-
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback -
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
3. 消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
-
交换机持久化 -
队列持久化 -
消息持久化
3.1. 交换机持久化
SpringAMQP可以通过代码指定交换机持久化,默认情况下,由SpringAMQP声明的交换机都是持久化的。可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示
-
1.SpringAMQP中可以通过代码指定交换机持久化
@Bean
public DirectExchange simpleExchange(){
// 三个参数:参数一:交换机名称、
// 参数二:true持久化、false 当没有queue与其绑定时是否自动删除
// 参数三: 如果服务器应在exchange不再使用时删除它,则为true
return new DirectExchange("simple.direct", true, false);
}
-
2.RabbitMQ控制台看到持久化的交换机都会带上D的标示

3.2. 队列持久化
SpringAMQP可以通过代码指定交换机持久化,默认情况下,由SpringAMQP声明的队列都是持久化的。可以在RabbitMQ控制台看到持久化的队列都会带上D的标示
-
1.SpringAMQP中可以通过代码指定交换机持久化
@Bean
public DirectExchange simpleExchange(){
// 三个参数:参数一:交换机名称、
// 参数二:true持久化、false 当没有queue与其绑定时是否自动删除
// 参数三: 如果服务器应在exchange不再使用时删除它,则为true
return new DirectExchange("simple.direct", true, false);
}
-
2.RabbitMQ控制台看到持久化的队列都会带上D的标示

3.3. 消息持久化
SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:
-
1.非持久化
-
2.持久化
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定
代码实现:
@GetMapping("/durable/{msg}")
public String sendMsgDurableMessage(@PathVariable("msg") String msg) {
//创建消息,将消息封装到Message
Message message = MessageBuilder
.withBody(msg.getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend("zbbmeta.direct.confirms","task",message);
return msg;
}
4. 消费者消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。假设这样的场景:
-
1)RabbitMQ投递消息给消费者 -
2)消费者获取消息后,返回ACK给RabbitMQ -
3)RabbitMQ删除消息 -
4)消费者宕机,消息尚未处理
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。而SpringAMQP则允许配置三种确认模式:
-
manual:手动ack,需要在业务代码结束后,调用api发送ack。 -
auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack -
none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
4.1. 演示none模式
none模式下,消息投递是不可靠的,可能丢失
【步骤一】:修改mq-consumer服务的application.yml文件,添加下面内容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 关闭ack ,消息投递后立即被删除
【步骤二】:在mq-consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.err.println("消费者接收到simple.queue的消息:"+msg);
// 模拟异常
System.out.println(1 / 0);
System.err.println("消息处理完成!");
}
【步骤三】:在mq-publisher服务的PublisherController类添加sendMsgHandleMessage接口
@GetMapping("/handle/{msg}")
public String sendMsgHandleMessage(@PathVariable("msg") String msg) {
rabbitTemplate.convertAndSend("simple.queue",msg);
return msg;
}
【步骤四】:测试
-
打开浏览器发送消息到 simple.queue
, 输入地址:http://localhost:8888/publisher/handle/hahha111
结果:
-
1.IDEA控制台抛出错误内容


4.2. 演示auto模式
auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
【步骤一】:再次修改mq-consumer服务的application.yml文件,添加下面内容:
把确认机制修改为auto
spring:
rabbitmq:
listener:
simple:
# auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
acknowledge-mode: auto
【步骤二】:测试
-
打开浏览器发送消息到
simple.queue
, 输入地址:http://localhost:8888/publisher/handle/hahha111
-
1.在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态)结果:

-
2.抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除

4.3. 消费失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力

思考:如何解决消息异常时,无限循环?
4.3.1. 本地重试
可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
-
在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了 -
查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息 结论: -
开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试 -
重试达到最大次数后,Spring会返回ack,消息会被丢弃
4.3.2. 失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
-
RejectAndDontRequeueRecoverer:重新尝试耗尽后,直接reject,丢弃消息。默认就是这种方式 -
ImmediateRequeueMessageRecoverer:重新尝试耗尽后,返回nack,消息重新入队 -
RepublishMessageRecoverer:重新尝试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理
-
1)在mq-consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
-
2)在mq-consumer服务中定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
4.3.3. 测试失败策略
-
-
mq-consumer控制台结果

ErrorMessageConfig完整代码:
package com.zbbmeta.mq.config;
/**
* @author springboot葵花宝典
* @description: TODO
*/
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
4.4. 演示manual模式
manual:自己根据业务情况,判断什么时候该ack
【步骤一】:再次修改mq-consumer服务的application.yml文件,添加下面内容:
把确认机制修改为manual
spring:
rabbitmq:
listener:
simple:
# auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
acknowledge-mode: manual
【步骤二】:在mq-consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
@RabbitListener(queues = "simple.queue2")
public void listenSimpleQueue2(Message message, Channel channel) throws IOException {
try {
// 处理消息的业务逻辑
System.out.println(1 / 0);
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("消费者接收到simple.queue的消息:"+message);
// 发生异常时,将消息重新入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 或者使用 channel.basicReject() 方法拒绝消息,并不重新入队
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
【步骤三】:在mq-publisher服务的PublisherController类添加sendMsgManualHandleMessage接口
@GetMapping("/manual/{msg}")
public String sendMsgManualHandleMessage(@PathVariable("msg") String msg) {
rabbitTemplate.convertAndSend("simple.queue2",msg);
return msg;
}
【步骤四】:测试
-
重新启动服务,,然后打开浏览器发送消息到 simple.queue2
, 输入地址:http://localhost:8888/publisher/manual/hahha111
结果:

总结如何确保RabbitMQ消息的可靠性?
-
开启生产者确认机制,确保生产者的消息能到达队列 -
开启持久化功能,确保消息未消费前在队列中不会丢失 -
开启消费者确认机制为auto,由spring确认消息处理成功后完成ack -
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
如果您觉得本文不错,欢迎关注,点赞,收藏支持,您的关注是我坚持的动力!
原创不易,转载请注明出处,感谢支持!如果本文对您有用,欢迎转发分享!
原文始发于微信公众号(springboot葵花宝典):从源头到终点:解密 RabbitMQ 消息丢失的神奇解决方案!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/183102.html