RabbitMQ生产端消息可靠性投递实现

导读:本篇文章讲解 RabbitMQ生产端消息可靠性投递实现,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

常见的解决方案有两种:
1.消息落库,对消息状态进行打标
2.消息的延迟投递,做二次确认,回调检查

这里采用第一种方案
实现流程:

  1. 首先将将要发送的业务数据持久化到业务数据库中,消息状态的数据持久化到消息数据库中(初始状态为消息投递中)。
  2. 生产者发送消息到RabbitMQ队列中,RabbitMQ开启确认回调,生产者监听确认回调,如果监听到则更新消息数据库中的状态为消息投递成功
  3. 分布式定时任务获取所有投递中的消息,进行重发,如果某个任务的重发次数大于某个值分布式定时任务修改该消息状态为投递失败

1.创建消息状态数据库
对应实体类MaiLog
在这里插入图片描述

2.定义所需常量

public class MailConstants {
    //消息投递中
    public static final Integer DELIVERING = 0;
    //消息投递成功
    public static final Integer SUCCESS = 1;
    //消息投递失败
    public static final Integer FAILURE = 2;
    //最大重试次数
    public static final Integer MAX_TRY_COUNT = 3;
    //消息超时时间
    public static final Integer MSG_TIMEOUT = 1;
    //队列
    public static final String QUEUE = "mail.queue";
    //交换机
    public static final String EXCHANGE = "mail.exchange";
    //路由键
    public static final String ROUTING_KEY = "mail.routing.key";
}

3.在项目中发送消息的地方进行消息落库

            String msgId = UUID.randomUUID().toString();
            MailLog mailLog = new MailLog();
            mailLog.setMsgId(msgId);
            mailLog.setEId(employee.getId());
            mailLog.setStatus(0);
            mailLog.setRouteKey(MailConstants.ROUTING_KEY);
            mailLog.setExchange(MailConstants.EXCHANGE);
            mailLog.setCount(MailConstants.MAX_TRY_COUNT);
            //重试时间为当前时间加上消息超时时间
            mailLog.setTryTime(LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT));
            mailLog.setCreateTime(LocalDateTime.now());
            mailLog.setUpdateTime(LocalDateTime.now());
            mailLogMapper.insert(mailLog);
            //发送消息
            rabbitTemplate.convertAndSend(MailConstants.EXCHANGE,MailConstants.ROUTING_KEY,emp,new CorrelationData(msgId));

4.开启消息回调

@Configuration
public class RabbitMQConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class);
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;
    @Autowired
    private MailLogService mailLogService;

    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        /**
         * 消息确认回调
         * data消息唯一表示,消息Id
         * ack确认结果
         * cause:失败原因
         */
        rabbitTemplate.setConfirmCallback((data,ack,cause)->{
            String msgId = data.getId();
            if (ack){
                LOGGER.info("=======>消息发送成功",msgId);
                mailLogService.update(new UpdateWrapper<MailLog>().set("status",1).eq("msg_id",msgId));
            }else {
                LOGGER.error("=======>消息发送失败",msgId);
            }
        });
        /**
         * 消息失败回调
         * msg:消息主题
         * repCode:响应码
         * repText:响应描述
         * exchange:交换机
         * routingkey:路由键
         */
        rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey)->{
            LOGGER.error("==========>消息发送到队列时失败",msg.getBody());
        });
        return rabbitTemplate;
    }



    @Bean
    public Queue queue(){
        return new Queue(MailConstants.QUEUE);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(MailConstants.EXCHANGE);
    }

    /**
     * 队列和交换机绑定
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(directExchange()).with(MailConstants.ROUTING_KEY);
    }
}

yml

    #消息确认回调
    publisher-confirm-type: correlated
    #消息失败回调
    publisher-returns: true

5.定时任务

@Component
public class MailTask {
    @Autowired
    private MailLogService mailLogService;
    @Autowired
    private EmployeeService employeeService;
    @Autowired
    private RabbitTemplate rabbitTemplate;

//每10秒执行一次
    @Scheduled(cron = "0/10 * * * * ?")
    public void mailTask(){
        //在发送中且发送超时的
        List<MailLog> list = mailLogService.list(new QueryWrapper<MailLog>().eq("status", 0).lt("try_time", LocalDateTime.now()));
        for (MailLog mailLog : list) {
        //重试次数超过三次,更新状态为投递失败
            if (mailLog.getCount()>=3){
                mailLogService.update(new UpdateWrapper<MailLog>().set("status",2).eq("msg_id",mailLog.getMsgId()));
            }
            mailLogService.update(new UpdateWrapper<MailLog>().set("count",mailLog.getCount()+1)
                    .set("update_time",LocalDateTime.now())
                    .set("try_time",LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT))
                    .eq("msg_id",mailLog.getMsgId()));
            Employee employee = employeeService.getEmployee(mailLog.getEId()).get(0);
            //消息重发
            rabbitTemplate.convertAndSend(MailConstants.EXCHANGE,MailConstants.ROUTING_KEY,employee,new CorrelationData(mailLog.getMsgId()));
        }
    }
}

6.一次发送成功测试
在这里插入图片描述
7.模拟发送失败,重试一次测试
这里报发送时间交换机改为错误的从而模拟第一次发送失败
在这里插入图片描述

在这里插入图片描述
8.把定时任务中的发送也改错,模拟三次重试失败
在这里插入图片描述
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/4967.html

(0)
小半的头像小半

相关推荐

极客之家——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!