继续Spring Webflux,WebClient实现非阻塞模式的远程调用


继续Spring Webflux,WebClient实现非阻塞模式的远程调用


WebClient是从Spring WebFlux 5.0版本开始提供的一个非阻塞的基于响应式编程的进行Http请求的客户端工具。它的响应式编程的基于Reactor的。WebClient中提供了标准Http请求方式对应的get、post、put、delete等方法,可以用来发起相应的请求。

个人认为目前Spring Webflux比较适合的应用场景就是网关,比如Spring Cloud Gateway就是一个例子,因为目前它对于关系型数据库尚不支持,目前已支持的数据库包括Redis、MongoDB等Nosql数据库。

而今天的这个Demo就是基于Spring Webflux实践的服务端的远程调用,类似于Openfeign,但是由于它是基于Webflux的,因此是非阻塞的,性能比较好。

预期效果

利用Webclient,实现webflux-client对于p-webflux的远程调用。

设计

继续Spring Webflux,WebClient实现非阻塞模式的远程调用
image-20210517175023548

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

编码

注释接口

/**
 * @author: yunho
 * @description: 服务器相关信息
 */

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ApiServer {
    String value() default "";
}

实体类

/**
 * @author: yunho
 * @description:
 */

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ServerInfo {
    private String url;
}
/**
 * @author: yunho
 * @description:方法调用信息类
 */

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MethodInfo {
    private String url;
    private HttpMethod httpMethod;
    private Map<String,Object> params;
    private Mono body;
    /**
     * 请求body的类型
     */

    private Class<?> bodyClass;
    /**
     * 是否flux,否则为mono
     */

    private Boolean isFlux;
    /**
     * 返回对象类型
     */

    private Class<?> returnElementType;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private String id;

    private String name;

    private int age;
}

FactoryBean配置类

@Configuration
public class FactoryBeanConfig {
    @Bean
    public ProxyCreator proxyCreator(){
        return new JdkProxyCreator();
    }
    @Bean
    FactoryBean<IUser> userApi(ProxyCreator proxyCreator){
        return new FactoryBean<IUser>() {
            //返回代理对象
            @Override
            public IUser getObject() throws Exception {
                return (IUser) proxyCreator.createProxy(this.getObjectType());
            }

            @Override
            public Class<?> getObjectType() {
                return IUser.class;
            }
        };
    }
}

接口

public interface ProxyCreator {
    Object createProxy(Class<?> type);
}
/**
 * @author: yunho
 * @date: 2021/5/14 16:21
 * @description:Rest 请求调用
 */

public interface RestHandler {
    /**
     * 初始化服务器信息
     * @param serverInfo
     */

    void init(ServerInfo serverInfo);

    /**
     * 调用rest接口,返回接口
     * @param methodInfo
     * @return
     */

    Object invokeRest(MethodInfo methodInfo);
}
@ApiServer("http://localhost:8089/ruser")
public interface IUser {
    @GetMapping("/")
    Flux<User> getAllUser();
    @GetMapping("/{id}")
    Mono<User> getUserById(@PathVariable("id") String id);
    @DeleteMapping("/{id}")
    Mono<Void> deleteUser(@PathVariable("id") String id);
    @PostMapping("/")
    Mono<User> putUser(@RequestBody Mono<User> userMono);
}

代理类

/**
 * @author: yunho
 * @description: 使用jdk动态代理
 */

@Slf4j
public class JdkProxyCreator implements ProxyCreator {

    @Override
    public Object createProxy(Class<?> type) {
        log.info("createProxy*****"+type);

        //根据接口得到API信息
        ServerInfo serverInfo = extractServerInfo(type);
        log.info("serverInfo*****"+serverInfo);
        //给每一个代理类一个实现
        RestHandler restHandler = new WebClientRestHandler();
        //初始化服务器信息 初始化web client
        restHandler.init(serverInfo);
        return Proxy.newProxyInstance(this.getClass().getClassLoader(),
                new Class[]{type}, new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        //根据方法和参数得到调用信息
                        MethodInfo methodInfo = extractMethodInfo(method,args);
                        log.info("methodInfo*****"+methodInfo);
                        //调用rest
                        //这种写法 发现serverInfo是每次调用都会传,而且它是一样的,因此可以将它在类初始化时定义。
//                        return restHandler.invokeRest(serverInfo,methodInfo);
                        return restHandler.invokeRest(methodInfo);
                    }

                    /**
                     * 根据方法定义和调用参数得到相关信息
                     * @param method
                     * @param args
                     * @return
                     */

                    private MethodInfo extractMethodInfo(Method method, Object[] args) {

                        MethodInfo methodinfo = new MethodInfo();
                        extractHttpMethod(method, methodinfo);
                        extractRequestParams(method, args, methodinfo);
                        //提取返回对象信息
                        extractReturnInfo(method,methodinfo);
                        return methodinfo;
                    }

                    /**
                     * 提取返回对象信息
                     * @param method
                     * @param methodinfo
                     */

                    private void extractReturnInfo(Method method, MethodInfo methodinfo) {
                        //isAssignableFrom 判断是否某个类的子类,instanceof 判断实例
                        boolean assignableFrom = method.getReturnType().isAssignableFrom(Flux.class);
                        methodinfo.setIsFlux(assignableFrom);
                        //得到返回对象的实际类型
                        Class<?> elementType = extractElementType(method.getGenericReturnType());
                        methodinfo.setReturnElementType(elementType);
                    }

                    /**
                     * 得到泛型类型的实际类型
                     * @param genericReturnType
                     * @return
                     */

                    private Class<?> extractElementType(Type genericReturnType) {
                        Type[] actualTypeArguments = ((ParameterizedType) genericReturnType).getActualTypeArguments();
                        return (Class<?>) actualTypeArguments[0];
                    }

                    /**
                     * 获取请求参数
                     * @param method
                     * @param args
                     * @param methodinfo
                     */

                    private void extractRequestParams(Method method, Object[] args, MethodInfo methodinfo) {
                        //得到参数和body
                        Parameter[] parameters = method.getParameters();
                        Map<String,Object> params = new LinkedHashMap<>();
                        methodinfo.setParams(params);

                        for (int i = 0; i < parameters.length; i++) {
                            PathVariable pathVariable = parameters[i].getAnnotation(PathVariable.class);
                            if(pathVariable !=null){
                                params.put(pathVariable.value(), args[i]);
                            }
                            //是否带了requestBody
                            RequestBody requestBody = parameters[i].getAnnotation(RequestBody.class);
                            if(requestBody!=null){
                                methodinfo.setBody((Mono<?>) args[i]);
                                //请求对象的实际类型
                                methodinfo.setBodyClass(extractElementType(parameters[i].getParameterizedType()));
                            }

                        }
                    }

                    /**
                     * 获取url和httpmethod
                     * @param method
                     * @param methodinfo
                     */

                    private void extractHttpMethod(Method method, MethodInfo methodinfo) {
                        Annotation[] annotations = method.getAnnotations();
                        for (Annotation a:annotations
                             ) {
                            //GET
                            if(a instanceof GetMapping){
                                GetMapping getMapping = (GetMapping) a;
                                methodinfo.setUrl(getMapping.value()[0]);
                                methodinfo.setHttpMethod(HttpMethod.GET);
                            }//POST
                            else if(a instanceof PostMapping){
                                PostMapping m = (PostMapping) a;
                                methodinfo.setUrl(m.value()[0]);
                                methodinfo.setHttpMethod(HttpMethod.POST);
                            }//DELETE
                            else if(a instanceof DeleteMapping){
                                DeleteMapping m = (DeleteMapping) a;
                                methodinfo.setUrl(m.value()[0]);
                                methodinfo.setHttpMethod(HttpMethod.DELETE);
                            }
                        }
                    }
                });
    }

    /**
     * 返回服务器信息
     * @param type
     * @return
     */

    private ServerInfo extractServerInfo(Class<?> type) {
        ServerInfo serverinfo = new ServerInfo();
        ApiServer annotation = type.getAnnotation(ApiServer.class);
        serverinfo.setUrl(annotation.value());
        return serverinfo;
    }
}

核心业务处理类

/**
 * @author: yunho
 * @description:
 */

public class WebClientRestHandler implements RestHandler {

    private WebClient webClient;

    //初始化Webclient端
    @Override
    public void init(ServerInfo serverInfo) {
        this.webClient = WebClient.create(serverInfo.getUrl());
    }

    /**
     * 处理Rest请求
     * @param methodInfo
     * @return
     */

    @Override
    public Object invokeRest(MethodInfo methodInfo) {
        Object result =null;
        WebClient.RequestBodySpec accept = this.webClient
                .method(methodInfo.getHttpMethod())
                //url和参数
                .uri(methodInfo.getUrl(), methodInfo.getParams())
                .accept(MediaType.APPLICATION_JSON);
        WebClient.ResponseSpec retrieve =null;
        //判断是否有body
        if(methodInfo.getBody()!=null){
            //发出请求
            retrieve= accept.body(methodInfo.getBody(), methodInfo.getBodyClass()).retrieve();
        }else{
            retrieve = accept.retrieve();
        }
        retrieve.onStatus(status -> status.value()== HttpStatus.NOT_FOUND.value()
                , response -> Mono.just(new RuntimeException("未找到指定对象")));
        //处理flux还是mono
        if(methodInfo.getIsFlux()){
            result=  retrieve.bodyToFlux(methodInfo.getReturnElementType());
        }else{
            result=retrieve.bodyToMono(methodInfo.getReturnElementType());
        }
        //处理body
                

        return result;
    }
}

总结

整个代码量很小,但是实现了预期的效果,而且具备良好的扩展性

数据结构:ServerInfo和MethodInfo作为routerfunction 动态代理的实体类,与其他框架无耦合

接口:IUser 业务接口、ProxyCreator 动态代理接口、RestHandler Rest请求处理接口(核心业务)

实现类:JdkProxyCreator 基于JDK的动态代理实现。

分享一些原则

程序=数据结构+算法

设计的最重要的是解耦

实现解耦的关键是设计自己的数据结构+抽象接口

只要有改变的可能就应该定义一个接口


原文始发于微信公众号(云户):继续Spring Webflux,WebClient实现非阻塞模式的远程调用

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/25875.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!