自定义平台MQ【值得一学】


  • 一、引入问题

  • 二、流程图

    • 2-1、系统交互流程图

    • 2-2、具体服务内部流转图

  • 三、代码实现

    • 3-1、代码 (基于RabbitMQ实现)

    • 3-2、使用

  • 四、衍生问题

    • 4-1、是否需要配置化注入

    • 4-2、Spring组件扫描

    • 4-3、如何定义事件类型公共属性

    • 4-4、事件类型如何处理

  • 五、源码获取


之前面试的时候都会被问到为什么使用MQ,使用MQ的好处是什么,我都会照本宣科的说:异步、解耦、削峰,这几个词也好理解,都是字面意思,今天我们就来进一步加深理解异步和结解耦。


一、引入问题


先思考这样一个问题,在多个系统之间我们想要异步的调用怎么做呢?当然MQ就是一个很好的解决办法

  • 如何去用呢?在A系统引入MQ,作为生产者,在B系统也引入MQ做消费者,当然可以实现功能,但会不会很麻烦?每个系统都要引入一套重复的东西。
  • 大多数我们业务场景的并发量其实很小,如果我们对每个业务场景都弄一个自己的queue 是不是很浪费?管理起来也很麻烦。
  • 如果有一个场景当我们做了某个操作之后,我们要通知A、B、C…系统来做相对应的处理,又该如何去做呢?(系统只会越来越多)

对于上面的问题,我们可以给出一个解决方案,那就是我们可以定义一个平台MQ,做成一个starter,谁要用我们就引入这个pom,每个项目都有自己的spring.application我们以这个为队列名称,注册到我们MQ里面,做一个广播消息,每一个服务既可以做生产者,也可以做消费者。


其实这里会有一个问题,比如我A服务发送一个消息了,B、C、D…服务都接收到了这个消息,但实际上只有B服务是需要消费这个消息的。很多人可能和我最开始的思路一样我在消息体里面加一个type,根据这个type来去判断谁消费。

// 接受到了消息,拿到了type
if(type == 1) {
 // ...
}else if(type == 2) {
 // ...
}
....

上面的代码当然可以解决我们的问题,但是想想每次新增一个事件都得去修改原本的接受逻辑,太low了。Spring框架里面已经做了这个操作ApplicationEventPublisher 通过这个类,我们就可以做到请求分发,根据class类型来。(具体后面讲解)


二、流程图

基于上面的理解,我画出了基础的流程图


2-1、系统交互流程图

每个服务都引入基础的mq-starter底包

每一个服务都可以作生产者,但每一个服务都是消费者。自定义平台MQ【值得一学】


2-2、具体服务内部流转图

自定义平台MQ【值得一学】
在这里插入图片描述


三、代码实现

自定义平台MQ【值得一学】
在这里插入图片描述


3-1、代码 (基于RabbitMQ实现)

AutoConfigurationPlatformMq

这就是自动注入的核心代码了

import com.xdx97.mq.consumer.PlatformConsumer;
import com.xdx97.mq.provider.PlatformMqProvider;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;


/**
 * 平台队列自动注入
 *
 * @author xdx
 * @date 22/04/15
 */

@Configuration
public class AutoConfigurationPlatformMq {

    /**
     * 平台队列交换机名称
     */

    public static final String PLATFORM_EXCHANGE = "platform_exchange";

    /**
     * 当前项目队列名称
     */

    private final String projectQueue;

    /**
     * 当前项目路由key
     */

    private final String projectRouteKey;

    public AutoConfigurationPlatformMq(Environment environment) {
        this.projectQueue = environment.getRequiredProperty("spring.application.name") + ".platform";
        this.projectRouteKey = "spring.application.name." + environment.getRequiredProperty("spring.application.name");
    }

    /**
     * 注入队列
     * @return
     */

    @Bean
    public Queue platformQueue() {
        return QueueBuilder
                .durable(projectQueue)
                .build();
    }

    /**
     * 注入交换机
     * @return
     */

    @Bean
    public Exchange platformExchange() {
        return ExchangeBuilder
                .fanoutExchange(PLATFORM_EXCHANGE)
                .durable(true)
                .build();
    }

    /**
     * 队列绑定交换机
     * @return
     */

    @Bean
    public Binding platformBinding() {
        return BindingBuilder
                .bind(platformQueue())
                .to(platformExchange())
                .with("*")
                .and(null);
    }

    /**
     * 注入生产者
     * @return
     */

    @Bean
    public PlatformMqProvider platformMqProvider(){
        return new PlatformMqProvider();
    }

    /**
     * 注入消费者
     * @return
     */

    @Bean
    public PlatformConsumer platformConsumer(){
        return new PlatformConsumer();
    }

}


PlatformMqProvider

import com.xdx97.mq.AutoConfigurationPlatformMq;
import com.xdx97.mq.event.PlatformEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;

import Javax.annotation.Resource;
import java.util.function.Consumer;

/**
 * 消息发送类
 *
 * @author xdx
 * @date 22/04/15
 */

@Slf4j
public class PlatformMqProvider {

    @Value("${spring.application.name}")
    private String source;

    @Resource
    private AmqpTemplate rabbitMqTemplate;

    /**
     * 发送平台消息
     *
     * @param platformEvent 平台事件消息
     */

    public void sendPlatformMessage(PlatformEvent platformEvent) {
        platformEvent.setSource(source);
        rabbitMqTemplate.convertAndSend(AutoConfigurationPlatformMq.PLATFORM_EXCHANGE,"*", platformEvent);
    }


    /**
     * 发送其它消息
     */

    public void sendOtherMessage(Consumer<AmqpTemplate> consumer) {
        consumer.accept(this.rabbitMqTemplate);
    }

}


PlatformConsumer

import com.xdx97.mq.event.PlatformEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.messaging.handler.annotation.Payload;

/**
 * 平台事件消费者
 *
 * @author xdx
 * @date 22/04/15
 */

@Slf4j
public class PlatformConsumer implements ApplicationEventPublisherAware {

    @Autowired
    private ApplicationEventPublisher publisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }

    @RabbitHandler
    @RabbitListener(queues = "${spring.application.name}.platform")
    public void handler(@Payload PlatformEvent message) {
        log.info("接受到平台事件消息:{}",message.toString());
        publisher.publishEvent(message);
    }

}


PlatformEvent

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.UUID;

/**
 * 平台事件消息父类
 *
 * @author xdx
 * @date 22/04/15
 */

@Data
public class PlatformEvent implements Serializable {

    private static final long serialVersionUID=1L;

    /**
     * 系统来源
     */

    private String source;

    /**
     * 唯一标示id
     */

    private String transactionNo = UUID.randomUUID().toString();

    /**
     * 发送消息时间
     */

    private LocalDateTime eventTimeStamp = LocalDateTime.now();
}


pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xdx97.mq</groupId>
    <artifactId>xdx-mq-starter</artifactId>
    <version>1.0.0-SNAPSHOT</version>


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.2.1.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <properties>
        <maven.build.timestamp.format>yyyyMMddHHmmss</maven.build.timestamp.format>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

</project>


3-2、使用

导入pom依赖

<groupId>com.xdx97.mq</groupId>
<artifactId>xdx-mq-starter</artifactId>
<version>1.0.0-SNAPSHOT</version>


定义一个事件

其实就是一个继承PlatformEvent的实体类 (注:这个是要放在底包里面的)

@Data
public class TestEvent extends PlatformEvent{
    
    private String name;
}


发送消息

任意服务

@Resource
private PlatformMqProvider platformMqProvider;

public void fun() {
    TestEvent testEvent = new TestEvent();
    testEvent.setName("小道仙");
    
    platformMqProvider.sendPlatformMessage(testEvent);
}


消费消息

任意服务

@EventListener(TestEvent.class)
public void testListener(TestEvent testEvent)
{
    // 业务处理
    
}


四、衍生问题

4-1、是否需要配置化注入

Spring里面提供一类以 @ConditionalOn 开头的注解,可以理解成在一定条件下进行注入

  • @ConditionalOnBean 当容器中存在某个bean才进行注入

  • @ConditionalOnProperty 当配置文件满足什么条件才进行注入

所以我在设计之初考虑如果我可以用这种方式去控制何时注入queue、何时注入exchange、何时注入生产者…

这看似提高了灵活度,但是仔细思考一下,别人如果引入你的包,不去把队列绑定到平台事件上,那就相当于无法发送消息和消费消息,那引入这个包的意义何在?


4-2、Spring组件扫描

最开始我的消费者和生产者都是如下这样去定义的,每个类上面都加了两个注解@Slf4j @Component

@Slf4j
@Component
public class PlatformConsumer implements ApplicationEventPublisherAware 

@Slf4j日志注解自不必说,@Component 是注入bean的,但有一个==前提是你的项目可以扫描到这个包==

我们项目包名都是以公司的域名来命名的,而且扫描的范围一般都很大,基于这两个前提下我这个mq-starter是没有问题的,可以发送可以接受。

但如果使用者的项目包名不是以你这个命名的,那就完蛋。

这里理解一下自动注入,也就是引入了你的pom文件后,底包里面的bean应该是要自动注入的,上面这种做法不是自动注入,而是使用者的项目去扫描到而注入的。

改造后直接把bean注入放在AutoConfigurationPlatformMq 里面就好了


4-3、如何定义事件类型公共属性

所谓事件类型公共属性就是PlatformEvent类了,这个其实和功能实现无关,在我最开始实现了接受消息和发送消息后,我就以为我这个mq-starter已经完成了,当时这个类里面只有一个transactionNo 唯一标示id

这也是demo和生产最大的区别,如果我自己搭建demo,至此已经完美结束了

但在生产不行,平台事件是每个服务都可以发送的,只一个id无法知道具体的来源,后面在组长的帮助下加了系统来源和时间

这一点也很重要,一个完整的工程,不仅仅是代码功能的实现,还有业务的考量,还有代码的优美,比如你都命名a、b、c  这合理吗?


4-4、事件类型如何处理

所谓的事件类型就是一个个消息class,只要是继承了PlatformEvent的类都算

原本我是想在生产者端定义一个A.class 去继承PlatformEvent,在消费者端也同样来一个A.class 去继承PlatformEvent

想一想,这样的两个A.class是一样的吗?

最终解决办法是把这A.class放在底包里面就好了,这样消费者和生产者引入的都是一个A.class了,每次新增只需要重新 mvn deploy 一下即可。需要用到的服务去更新一个maven之前旧的服务不更新也不会出错


五、源码获取

  • 公众号回复关键字:xdx-mq-starter




原文始发于微信公众号(小道仙97):自定义平台MQ【值得一学】

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41293.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!