Dubbo注册协议原理以及源码阅读

导读:本篇文章讲解 Dubbo注册协议原理以及源码阅读,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

前言

继上次小编所讲RPC协议暴露服务并且远程调用之后,小编这次给大家带来注册中心协议整体流程原理以及源码精讲,Dubbo协议服务暴露与引用以及源码分析文章中,远程服务暴露可以只通过RPC协议即可,那为什么要注册中心呢,不知道大家有没有考虑到当多服务多调用的时候,我们的服务端url不可能写死,那么我们的注册中心就上场了。那怎样才可以与RPC协议进行交互并且注册服务到注册中心,这就是本篇小编给大家带来的内容。

注册协议

在这里插入图片描述
如上图所示,RegistryProtocol与DubboProtocol都是实现了Protocol。
在服务暴露的时候,调用的是RegistryProtocol,这个RegistryProtocol内部包含了一个协议DubboProtocol,然后RegistryProtocol把url注册到ZookeeperRegistry。上图可以看到registry协议地址中export属性就是dubbo协议(上面registry协议地址只包含了服务暴露)。
这里服务暴露是这样,引用也是这样,通过RegistryProtocol去注册消费者,在服务引用的时候自动找到服务提供者之后建立连接然后发起调用。

注册协议服务暴露

服务暴露流程:
在这里插入图片描述
使用RegistryProtocol调用,进行服务的暴露然后进行url的注册。
小编从RegistryProtocol开始通过代码示例讲一下流程。(记得开启zookeeper)

public class RegistryProtocolTest {

    private static final String REGISTRY_URL_TEXT = "zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=boot-server&dubbo=2.0.2&export=dubbo://127.0.0.1:20880/com.learn.code.IUserService";
    private static final String URL_TEXT = "dubbo://127.0.0.1:20880/com.learn.code.IUserService";
    @Before
    public void init() {
        ApplicationModel.getConfigManager().setApplication(new ApplicationConfig("test"));
    }

    @Test
    public void exportServer() throws IOException {
        //准备环境
        UserService userService = new UserService();
        ServiceRepository serviceRepository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = serviceRepository.registerService(IUserService.class);
        serviceRepository.registerProvider("com.learn.code.IUserService",
                userService, serviceDescriptor,
                new ServiceConfig<>(), null);
        //注册协议
        RegistryProtocol registryProtocol = new RegistryProtocol();
        //依赖实际协议
        registryProtocol.setProtocol(new DubboProtocol());
        //依赖注册工厂
        ZookeeperRegistryFactory zookeeperRegistryFactory = new ZookeeperRegistryFactory();
        //工厂的话依赖zookeeper的客户端
        zookeeperRegistryFactory.setZookeeperTransporter(new CuratorZookeeperTransporter());
        registryProtocol.setRegistryFactory(zookeeperRegistryFactory);

        //ProxyFactory jdkProxyFactory = new JdkProxyFactory();
        ProxyFactory javassistProxyFactory = new JavassistProxyFactory();
        //通过javassistProxyFactory(dubbo默认)获取到invoker后进行服务暴露
        registryProtocol.export(javassistProxyFactory.getInvoker(userService,IUserService.class, URL.valueOf(REGISTRY_URL_TEXT)));
        System.in.read();
    }
}


	//直接使用dubbo协议进行测试
	@Test
    public void referServer() throws IOException {
        DubboProtocol dubboProtocol = new DubboProtocol();
        Invoker<IUserService> invoker = dubboProtocol.refer(IUserService.class, URL.valueOf(URL_TEXT));
        ProxyFactory proxyFactory = new JdkProxyFactory();
        IUserService proxy = proxyFactory.getProxy(invoker);
        System.out.println(proxy.getUser(1L));

    }

测试结果

UserDto(id=1, age=18, name=Bob, desc=当前服务:null)

JavassistProxyFactory是dubbo默认的(基于动态生成代理类),与JdkProxyFactory通过反射不同,生成的代理类然后实例化后可以直接调用方法,而不是通过反射。大家有兴趣可自己看一下源码,小编不做详解。(有机会给大家看下生成代理类后的代码以及如何获取)

源码阅读

@Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    	// 注册地址
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally 服务提供者地址 即dubbo
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker 暴露远程服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
        	// 将url进行注册
            register(registryUrl, registeredProviderUrl);
        }

        // register stated url on provider model
        // 在提供者模型上注册指定的url
        registerStatedUrl(registryUrl, registeredProviderUrl, register);


        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

上面最重要两步骤就是

//export invoker 暴露远程服务 上篇文章所讲的dubbo远程服务的暴露并且放入一个map中,以便后续操作(销毁)
final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);
// 将url进行注册 连接zookeeper然后写入url
register(registryUrl, registeredProviderUrl);

注册协议服务引用

注册协议引用流程:
在这里插入图片描述

与服务暴露流程不同的是,这边先写入消费者url然后创建invoker。创建invoker过程中包括了服务订阅以及服务引用和构建集群的Invoker。这是大致流程。下面小编详细讲一下每个步骤的过程。

  1. 注册消费者URL:这个没上面可讲的,根据消费端的配置封装成所需的url写入注册中心
  2. 服务订阅:消费端监听服务的提供者配置和路由,这个主要是在每次服务端有发生变更时触发通知,刚刚启动触发第一次通知,刷新invokers,这边如果提供端有多个urls,然后多次调用DubboProtocl进行进行引用。
  3. 构建集群Invoker:基于多个invokers构建一个集群Invoker即ClusterInvoker。
  4. 最后调用的时候,通过集群Invoker通过负载均衡机制选择一个invoker发起远程调用。

小编下面通过代码示例进行证明:

private static final String REF_URL_TEXT = "zookeeper://127.0.0.1:2181//org.apache.dubbo.registry.RegistryService?application=boot-client&dubbo=2.0.2&interface=com.learn.code.IUserService";

@Test
    public void refServer(){
        //注册协议
        RegistryProtocol registryProtocol = new RegistryProtocol();
        //依赖实际协议
        registryProtocol.setProtocol(new DubboProtocol());
        //依赖注册工厂
        ZookeeperRegistryFactory zookeeperRegistryFactory = new ZookeeperRegistryFactory();
        //工厂的话依赖zookeeper的客户端
        zookeeperRegistryFactory.setZookeeperTransporter(new CuratorZookeeperTransporter());
        registryProtocol.setRegistryFactory(zookeeperRegistryFactory);
        //服务监听调用引用创建集群Invoker
        Invoker<IUserService> refer = registryProtocol.refer(IUserService.class, URL.valueOf(REF_URL_TEXT));
        //使用代理对象
        ProxyFactory javassistProxyFactory = new JavassistProxyFactory();
        IUserServiceproxy = javassistProxyFactory.getProxy(refer);
        System.out.println(proxy.getUser(1L));
    }

测试结果和上面一样。
小编先带大家看下集群Invoker的结构是怎样的。
在这里插入图片描述
Dubbo在消费端封装的Invoker非常多。不管怎么封装其实原理不变的,下面小编给大家捋捋Invoker
在这里插入图片描述

Cluster接口中可以根据Directory我们构建一个ClusterInvoker,本身Cluster实现类中如FailbackCluster同样帮助创建一个FailbackClusterInvoker。这个大家不用特别关心。重点是AbstractClusterInvoker有一个了Directory,而服务注册表就保存在他里面,同时含有多个invoker,invoker底层为DubboInvoker,其包含了多个client,这个client则为NettyClient,这样就和上述debug图中的invoker对应起来了。当然集群模式下通过负载均衡机制挑选一个invoker发起调用。

解释完集群Invoker的结构,继续说一下服务引用的时序图
服务引用时序图
在这里插入图片描述
小编根据源码给大家解释一下(摘重要源码):主要看注释
org.apache.dubbo.registry.integration.RegistryProtocol#refer

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
		//取出消费端提供的url使用registerFactory构建register
        url = getRegistryUrl(url);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

		//取出配置所需要采用的集群,没有配置默认失败重试,开始服务引用
        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url);
    }

org.apache.dubbo.registry.integration.RegistryProtocol#doRefer

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
		//创建最重要的对象RegistryDirectory,与注册中心与建立连接需要两个对象registry和远程协议
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            //注册消费者url,创建消费者临时节点
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        //服务订阅,这里代码比较复杂,请看下一块调用最终的地方
        directory.subscribe(toSubscribeUrl(subscribeUrl));
		//封装,directory保存了所有的invoker,即clusterInvoker
        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe

            	//这边走的是else代码
                List<URL> urls = new ArrayList<>();
                //监听provider,config,router 
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    //添加子节点并且notify
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                    	//添加所有的提供者信息
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //触发第一次刷新invoker,里面方法进行了try catch,进行失败重试,最终调用listener.notify
                notify(url, listener, urls);

org.apache.dubbo.registry.integration.RegistryDirectory#notify

@Override
    public synchronized void notify(List<URL> urls) {
    	//过滤一些信息进行分类
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(this::judgeCategory));

        // providers 重点看提供者信息
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());

        //刷新invoker,这里面有比较新旧invokers来判断是否要销毁没有用的url
        refreshOverrideAndInvoker(providerURLs);
    }

org.apache.dubbo.registry.integration.RegistryDirectory#toInvokers

//建立连接封装DubboInvoker
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);

总结

本篇文章与上一篇RPC协议远程服务暴露才是完整的Dubbo服务暴露与引用的全过程。缺一不可,这次小编并没有提到服务的销毁流程,后面有时间补上,大家也可以自己去了解和熟悉一下。当然这里还不是面向使用者,更像是面对dubbo开发人员,服务的暴露和引用实际使用的时候为ServiceConfig和ReferenceConfig,小编拆散揉碎了来从底层一步一步往上层扩展。先是dubbo协议可以直接远程调用,然后使用registry协议将dubbo协议封装注册到注册中心上去,最后我们从使用者角度使用ServiceConfig和ReferenceConfig来调用registry协议来完成dubbo的整个调用过程。当然这边小编在dubbo的第一章节中Dubbo整体架构与核⼼组件认知以及简单示例就有其调用示例。
到这里小编的Dubbo源码之旅快接近尾声了,后面还有Dubbo远程通讯时编解码源码分析,Dubbo整个调用链路流程以及Dubbo服务销毁。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/13570.html

(0)
小半的头像小半

相关推荐

半码博客——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!