SpringCloud Alibaba系列——2Nacos核心源码分析(下)

导读:本篇文章讲解 SpringCloud Alibaba系列——2Nacos核心源码分析(下),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

 第2章 配置中心

SpringCloud Alibaba系列——2Nacos核心源码分析(下)

2.1 第一次加载

2.1.1 启动环境装载

PropertySourceBootstrapConfiguration

1.spring启动run方法

ConfigurableEnvironment environment = this.prepareEnvironment(listeners, applicationArguments);

2.进入prepareEnvironment

private ConfigurableEnvironment prepareEnvironment(SpringApplicationRunListeners listeners, ApplicationArguments applicationArguments) {
    ConfigurableEnvironment environment = this.getOrCreateEnvironment();
    this.configureEnvironment((ConfigurableEnvironment)environment, applicationArguments.getSourceArgs());
    ConfigurationPropertySources.attach((Environment)environment);
    //环境监听---进入
    listeners.environmentPrepared((ConfigurableEnvironment)environment);
    this.bindToSpringApplication((ConfigurableEnvironment)environment);
    if (!this.isCustomEnvironment) {
        environment = (new EnvironmentConverter(this.getClassLoader())).convertEnvironmentIfNecessary((ConfigurableEnvironment)environment, this.deduceEnvironmentClass());
    }

    ConfigurationPropertySources.attach((Environment)environment);
    return (ConfigurableEnvironment)environment;
}

3.environmentPrepared方法

void environmentPrepared(ConfigurableEnvironment environment) {
    Iterator var2 = this.listeners.iterator();

    while(var2.hasNext()) {
        SpringApplicationRunListener listener = (SpringApplicationRunListener)var2.next();
        //添加事件
        listener.environmentPrepared(environment);
    }

}

4.在事件监听中添加ApplicationEnvironmentPreparedEvent

public void environmentPrepared(ConfigurableEnvironment environment) {
    this.initialMulticaster.multicastEvent(new ApplicationEnvironmentPreparedEvent(this.application, this.args, environment));
}

最终调用ApplicationListener的onApplicationEvent方法

路径为:multicastEvent方法->invokeListener方法->doInvokeListener方法

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
    try {
        //调用onApplicationEvent方法
        listener.onApplicationEvent(event);
    } catch (ClassCastException var6) {
        String msg = var6.getMessage();
        if (msg != null && !this.matchesClassCastMessage(msg, event.getClass())) {
            throw var6;
        }

        Log logger = LogFactory.getLog(this.getClass());
        if (logger.isTraceEnabled()) {
            logger.trace("Non-matching event type for listener: " + listener, var6);
        }
    }

}

5.类BootstrapApplicationListener 为ApplicationEnvironmentPreparedEvent事件后的处理

//onApplicationEvent()方法都是监听器监听到某个事件后的后续处理的逻辑。
public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {
    ConfigurableEnvironment environment = event.getEnvironment();
    if ((Boolean)environment.getProperty("spring.cloud.bootstrap.enabled", Boolean.class, true)) {
        if (!environment.getPropertySources().contains("bootstrap")) {
            ConfigurableApplicationContext context = null;
            String configName = environment.resolvePlaceholders("${spring.cloud.bootstrap.name:bootstrap}");
            Iterator var5 = event.getSpringApplication().getInitializers().iterator();

            while(var5.hasNext()) {
                ApplicationContextInitializer<?> initializer = (ApplicationContextInitializer)var5.next();
                if (initializer instanceof ParentContextApplicationContextInitializer) {
                    context = this.findBootstrapContext((ParentContextApplicationContextInitializer)initializer, configName);
                }
            }

            if (context == null) {
                //调用bootstrapServiceContext方法
                context = this.bootstrapServiceContext(environment, event.getSpringApplication(), configName);
                event.getSpringApplication().addListeners(new ApplicationListener[]{new BootstrapApplicationListener.CloseContextOnFailureApplicationListener(context)});
            }

            this.apply(context, event.getSpringApplication(), environment);
        }
    }
}

6.bootstrapServiceContext方法中加载配置类BootstrapImportSelectorConfiguration

builder.sources(new Class[]{BootstrapImportSelectorConfiguration.class}); 
ConfigurableApplicationContext context = builder.run(new String[0]);

7.BootstrapImportSelectorConfiguration类

@Configuration(
    proxyBeanMethods = false
)
@Import({BootstrapImportSelector.class})
public class BootstrapImportSelectorConfiguration {
    public BootstrapImportSelectorConfiguration() {
    }
}

8.添加BootstrapImportSelector类

public String[] selectImports(AnnotationMetadata annotationMetadata) {
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    //该方法从META-INF/spring.factories下去找BootstrapConfiguration的配置
    List<String> names = new ArrayList(SpringFactoriesLoader.loadFactoryNames(BootstrapConfiguration.class, classLoader));
  names.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(this.environment.getProperty("spring.cloud.bootstrap.sources", ""))));
    List<BootstrapImportSelector.OrderedAnnotatedElement> elements = new ArrayList();
    Iterator var5 = names.iterator();

    while(var5.hasNext()) {
        String name = (String)var5.next();

        try {
            elements.add(new BootstrapImportSelector.OrderedAnnotatedElement(this.metadataReaderFactory, name));
        } catch (IOException var8) {
        }
    }

    AnnotationAwareOrderComparator.sort(elements);
    String[] classNames = (String[])elements.stream().map((e) -> {
        return e.name;
    }).toArray((x$0) -> {
        return new String[x$0];
    });
    return classNames;
}

9.找BootstrapConfiguration的配置,共2个jar包

NacosConfigBootstrapConfiguration类

SpringCloud Alibaba系列——2Nacos核心源码分析(下)

 PropertySourceBootstrapConfiguration类

SpringCloud Alibaba系列——2Nacos核心源码分析(下)

 10.重新回到run方法

this.prepareContext(context, environment, listeners, applicationArguments, printedBanner);

11.prepareContext会调用applyInitializers方法

protected void applyInitializers(ConfigurableApplicationContext context) {
    Iterator var2 = this.getInitializers().iterator();

    while(var2.hasNext()) {
        ApplicationContextInitializer initializer = (ApplicationContextInitializer)var2.next();
        Class<?> requiredType = GenericTypeResolver.resolveTypeArgument(initializer.getClass(), ApplicationContextInitializer.class);
        Assert.isInstanceOf(requiredType, context, "Unable to call initializer.");
        initializer.initialize(context);
    }

}

12.该方法会执行所有实现了接口ApplicationContextInitializer的initialize方法,而PropertySourceBootstrapConfiguration刚好为其实现类

2.1.2 initialize

调用PropertySourceBootstrapConfiguration的initialize方法

1.调用initialize方法

public void initialize(ConfigurableApplicationContext applicationContext) {
    List<PropertySource<?>> composite = new ArrayList();
    AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
    boolean empty = true;
    ConfigurableEnvironment environment = applicationContext.getEnvironment();
    Iterator var5 = this.propertySourceLocators.iterator();

    while(true) {
        Collection source;
        do {
            do {
                if (!var5.hasNext()) {
                    if (!empty) {
                        MutablePropertySources propertySources = environment.getPropertySources();
                        String logConfig = environment.resolvePlaceholders("${logging.config:}");
                        LogFile logFile = LogFile.get(environment);
                        Iterator var14 = environment.getPropertySources().iterator();

                        while(var14.hasNext()) {
                            PropertySource<?> p = (PropertySource)var14.next();
                            if (p.getName().startsWith("bootstrapProperties")) {
                                propertySources.remove(p.getName());
                            }
                        }

                        this.insertPropertySources(propertySources, composite);
                        this.reinitializeLoggingSystem(environment, logConfig, logFile);
                        this.setLogLevels(applicationContext, environment);
                        this.handleIncludedProfiles(environment);
                    }

                    return;
                }

                PropertySourceLocator locator = (PropertySourceLocator)var5.next();
                //重要方法,去拿配置
                source = locator.locateCollection(environment);
            } while(source == null);
        } while(source.size() == 0);

        List<PropertySource<?>> sourceList = new ArrayList();
        Iterator var9 = source.iterator();

        while(var9.hasNext()) {
            PropertySource<?> p = (PropertySource)var9.next();
            sourceList.add(new BootstrapPropertySource(p));
        }

        logger.info("Located property source: " + sourceList);
        composite.addAll(sourceList);
        empty = false;
    }
}

2.调用locateCollection方法

static Collection<PropertySource<?>> locateCollection(PropertySourceLocator locator, Environment environment) {
    //加载PropertySource
    PropertySource<?> propertySource = locator.locate(environment);
    if (propertySource == null) {
        return Collections.emptyList();
    } else if (CompositePropertySource.class.isInstance(propertySource)) {
        Collection<PropertySource<?>> sources = ((CompositePropertySource)propertySource).getPropertySources();
        List<PropertySource<?>> filteredSources = new ArrayList();
        Iterator var5 = sources.iterator();

        while(var5.hasNext()) {
            PropertySource<?> p = (PropertySource)var5.next();
            if (p != null) {
                filteredSources.add(p);
            }
        }

        return filteredSources;
    } else {
        return Arrays.asList(propertySource);
    }
}

3.调用locator.locate(environment);走到实现类NacosPropertySourceLocator

public PropertySource<?> locate(Environment env) {
    this.nacosConfigProperties.setEnvironment(env);
    ConfigService configService = this.nacosConfigManager.getConfigService();
    if (null == configService) {
        log.warn("no instance of config service found, can't load config from nacos");
        return null;
    } else {
        long timeout = (long)this.nacosConfigProperties.getTimeout();
        this.nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);
        String name = this.nacosConfigProperties.getName();
        String dataIdPrefix = this.nacosConfigProperties.getPrefix();
        if (StringUtils.isEmpty(dataIdPrefix)) {
            dataIdPrefix = name;
        }
		//为什么默认是spring.application.name
        if (StringUtils.isEmpty(dataIdPrefix)) {
            dataIdPrefix = env.getProperty("spring.application.name");
        }

        CompositePropertySource composite = new CompositePropertySource("NACOS");
        //加载共享配置
        this.loadSharedConfiguration(composite);
        //加载扩展配置
        this.loadExtConfiguration(composite);
        //加载自身配置
        this.loadApplicationConfiguration(composite, dataIdPrefix, this.nacosConfigProperties, env);
        return composite;
    }
}

加载循序为共享、扩展、自身,后续加载会替换之前加载,所以优先级为自身>扩展>共享

4.loadSharedConfiguration加载共享配置

private void loadSharedConfiguration(CompositePropertySource compositePropertySource) {
    //得到是否有共享配置
    List<Config> sharedConfigs = this.nacosConfigProperties.getSharedConfigs();
    if (!CollectionUtils.isEmpty(sharedConfigs)) {
        this.checkConfiguration(sharedConfigs, "shared-configs");
        //去加载Nacos配置
        this.loadNacosConfiguration(compositePropertySource, sharedConfigs);
    }

}

5.loadNacosConfiguration方法

private void loadNacosConfiguration(final CompositePropertySource composite, List<Config> configs) {
    Iterator var3 = configs.iterator();

    while(var3.hasNext()) {
        Config config = (Config)var3.next();
        //得到配置的dataID
        String dataId = config.getDataId();
        //得到配置的后缀
        String fileExtension = dataId.substring(dataId.lastIndexOf(".") + 1);
        this.loadNacosDataIfPresent(composite, dataId, config.getGroup(), fileExtension, config.isRefresh());
    }

}

6.loadNacosDataIfPresent方法

private void loadNacosDataIfPresent(final CompositePropertySource composite, final String dataId, final String group, String fileExtension, boolean isRefreshable) {
    if (null != dataId && dataId.trim().length() >= 1) {
        if (null != group && group.trim().length() >= 1) {
            NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group, fileExtension, isRefreshable);
            this.addFirstPropertySource(composite, propertySource, false);
        }
    }
}

private NacosPropertySource loadNacosPropertySource(final String dataId, final String group, String fileExtension, boolean isRefreshable) {
    //如果不是第一次刷新并且不是及时刷新,调用NacosPropertySourceRepository.getNacosPropertySource(dataId, group)(从本地内存获取,CHM存储)
    //否则调用nacosPropertySourceBuilder.build
    return NacosContextRefresher.getRefreshCount() != 0L && !isRefreshable ? NacosPropertySourceRepository.getNacosPropertySource(dataId, group) : this.nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable);
}

7.nacosPropertySourceBuilder.build方法

NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {
    //从nacos服务器拿取配置
    Map<String, Object> p = this.loadNacosData(dataId, group, fileExtension);
    //p为配置
    NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId, p, new Date(), isRefreshable);
    //将配置放入本地
    NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);
    return nacosPropertySource;
}

8.loadNacosData方法

private Map<String, Object> loadNacosData(String dataId, String group, String fileExtension) {
    String data = null;

    try {
        //继续追踪,最终进入getConfigInner
        data = this.configService.getConfig(dataId, group, this.timeout);
        if (StringUtils.isEmpty(data)) {
            log.warn("Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]", dataId, group);
            return EMPTY_MAP;
        }

        if (log.isDebugEnabled()) {
            log.debug(String.format("Loading nacos data, dataId: '%s', group: '%s', data: %s", dataId, group, data));
        }

        Map<String, Object> dataMap = NacosDataParserHandler.getInstance().parseNacosData(data, fileExtension);
        return dataMap == null ? EMPTY_MAP : dataMap;
    } catch (NacosException var6) {
        log.error("get data from Nacos error,dataId:{}, ", dataId, var6);
    } catch (Exception var7) {
        log.error("parse data from Nacos error,dataId:{},data:{},", new Object[]{dataId, data, var7});
    }

    return EMPTY_MAP;
}

9.getConfigInner方法

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
    group = this.null2defaultGroup(group);
    ParamUtils.checkKeyParam(dataId, group);
    ConfigResponse cr = new ConfigResponse();
    cr.setDataId(dataId);
    cr.setTenant(tenant);
    cr.setGroup(group);
    //优先使用本地配置
    String content = LocalConfigInfoProcessor.getFailover(this.agent.getName(), dataId, group, tenant);
    if (content != null) {
        LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", new Object[]{this.agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)});
        cr.setContent(content);
        this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
        content = cr.getContent();
        return content;
    } else {
        try {
            //通过agent.httpGet发起http请求,获取远程服务的配置
            String[] ct = this.worker.getServerConfig(dataId, group, tenant, timeoutMs);
            cr.setContent(ct[0]);
            this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
            content = cr.getContent();
            return content;
        } catch (NacosException var9) {
            if (403 == var9.getErrCode()) {
                throw var9;
            } else {
                LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", new Object[]{this.agent.getName(), dataId, group, tenant, var9.toString()});
                LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", new Object[]{this.agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)});
                content = LocalConfigInfoProcessor.getSnapshot(this.agent.getName(), dataId, group, tenant);
                cr.setContent(content);
                this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
                content = cr.getContent();
                return content;
            }
        }
    }
}

10.从本地磁盘文件获取

public static String getFailover(String serverName, String dataId, String group, String tenant) {
    //得到本地文件路径  
    File localPath = getFailoverFile(serverName, dataId, group, tenant);
    if (localPath.exists() && localPath.isFile()) {
        try {
            return readFile(localPath);
        } catch (IOException var6) {
            LOGGER.error("[" + serverName + "] get failover error, " + localPath, var6);
            return null;
        }
    } else {
        return null;
    }
}

11.getServerConfig方法

public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException {
    String[] ct = new String[2];
    //判断是否为空,是否为空字符串等等
    if (StringUtils.isBlank(group)) {
        group = "DEFAULT_GROUP";
    }

    HttpResult result = null;

    try {
        List<String> params = null;
        if (StringUtils.isBlank(tenant)) {
            params = new ArrayList(Arrays.asList("dataId", dataId, "group", group));
        } else {
            params = new ArrayList(Arrays.asList("dataId", dataId, "group", group, "tenant", tenant));
        }

        result = this.agent.httpGet("/v1/cs/configs", (List)null, params, this.agent.getEncode(), readTimeout);
    } catch (IOException var10) {
        String message = String.format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", this.agent.getName(), dataId, group, tenant);
        LOGGER.error(message, var10);
        throw new NacosException(500, var10);
    }

    switch(result.code) {
    case 200:
        //存到本地文件
        LocalConfigInfoProcessor.saveSnapshot(this.agent.getName(), dataId, group, tenant, result.content);
        ct[0] = result.content;
        if (result.headers.containsKey("Config-Type")) {
            ct[1] = (String)((List)result.headers.get("Config-Type")).get(0);
        } else {
            ct[1] = ConfigType.TEXT.getType();
        }

        return ct;
    case 403:
        LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", new Object[]{this.agent.getName(), dataId, group, tenant});
        throw new NacosException(result.code, result.content);
    case 404:
        LocalConfigInfoProcessor.saveSnapshot(this.agent.getName(), dataId, group, tenant, (String)null);
        return ct;
    case 409:
        LOGGER.error("[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, tenant={}", new Object[]{this.agent.getName(), dataId, group, tenant});
        throw new NacosException(409, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
    default:
        LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", new Object[]{this.agent.getName(), dataId, group, tenant, result.code});
        throw new NacosException(result.code, "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
    }
}

2.2 动态感应

1.我们刚才讲了再spring.factories里面,还加载了一个类NacosConfigBootstrapConfiguration

SpringCloud Alibaba系列——2Nacos核心源码分析(下)

 2.NacosConfigBootstrapConfiguration类会把NacosConfigManager作为一个Bean注入

@Bean
@ConditionalOnMissingBean
public NacosConfigManager nacosConfigManager(NacosConfigProperties nacosConfigProperties) {
    return new NacosConfigManager(nacosConfigProperties);
}

3.实例化NacosConfigManager时会实例化NacosConfigService

public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {
    this.nacosConfigProperties = nacosConfigProperties;
    createConfigService(nacosConfigProperties);
}

static ConfigService createConfigService(NacosConfigProperties nacosConfigProperties) {
    if (Objects.isNull(service)) {
        Class var1 = NacosConfigManager.class;
        synchronized(NacosConfigManager.class) {
            try {
                if (Objects.isNull(service)) {
                    service = NacosFactory.createConfigService(nacosConfigProperties.assembleConfigServiceProperties());
                }
            } catch (NacosException var4) {
                log.error(var4.getMessage());
                throw new NacosConnectionFailureException(nacosConfigProperties.getServerAddr(), var4.getMessage(), var4);
            }
        }
    }

    return service;
}

实例化NacosConfigService

public static ConfigService createConfigService(Properties properties) throws NacosException {
    try {
        //实例化NacosConfigService
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        //拿到NacosConfigService的有参构造函数
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        ConfigService vendorImpl = (ConfigService)constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable var4) {
        throw new NacosException(-400, var4);
    }
}

4.实例化NacosConfigService的时候,会调用构造方法

public NacosConfigService(Properties properties) throws NacosException {
    String encodeTmp = properties.getProperty("encode");
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = "UTF-8";
    } else {
        this.encode = encodeTmp.trim();
    }

    this.initNamespace(properties);
    //用户登录信息
    //在ServerHttpAgent方法中,实现了定时任务调度,登录Nacos(客户端获取配置、服务注册列表需要建立链接),时间是5秒一次
    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    //维护nacos服务列表
    //实际调用的是ServerHTTPAgent中的start方法,该类通过定时任务维护nacos的列表
    this.agent.start();
    //更新维护配置
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
public synchronized void start() throws NacosException {

    if (isStarted || isFixed) {
        return;
    }

    //维护serverUrllist
    GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);
    for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {
        //判断服务列表是否发生改变,如果发生改变,则更新服务列表
        getServersTask.run();
        try {
            this.wait((i + 1) * 100L);
        } catch (Exception e) {
            LOGGER.warn("get serverlist fail,url: {}", addressServerUrl);
        }
    }

    if (serverUrls.isEmpty()) {
        LOGGER.error("[init-serverlist] fail to get NACOS-server serverlist! env: {}, url: {}", name,
                     addressServerUrl);
        throw new NacosException(NacosException.SERVER_ERROR,
                                 "fail to get NACOS-server serverlist! env:" + name + ", not connnect url:" + addressServerUrl);
    }

    //将自己在丢到定时任务里面执行,执行时间是30秒一次
    TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
    isStarted = true;
}

 5.我们去看下ClientWorker

public ClientWorker(final HttpAgent agent, ConfigFilterChainManager configFilterChainManager, Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;
    this.init(properties);
    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
    //进行长轮询
    this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
    this.executor.scheduleWithFixedDelay(new Runnable() {
        public void run() {
            try {
                //多线程执行检查配置逻辑
                ClientWorker.this.checkConfigInfo();
            } catch (Throwable var2) {
                ClientWorker.LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", var2);
            }

        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

6.检查配置

public void checkConfigInfo() {
    //cacheMap里面存放的是CacheData对象,
    //当配置需要自动刷新时,会在cacheMap里面增加一条记录
    //cacheMap的key由groupId和dataId组成,value是CacheData
    int listenerSize = ((Map)this.cacheMap.get()).size();
    //一个线程只处理3000,如果超过3000,新开线程,所以,不足3000只会在第一次执行LongPollingRunnable方法
    int longingTaskCount = (int)Math.ceil((double)listenerSize / ParamUtil.getPerTaskConfigSize());
    if ((double)longingTaskCount > this.currentLongingTaskCount) {
        for(int i = (int)this.currentLongingTaskCount; i < longingTaskCount; ++i) {
            this.executorService.execute(new ClientWorker.LongPollingRunnable(i));
        }

        this.currentLongingTaskCount = (double)longingTaskCount;
    }

}

7.LongPollingRunnable方法

class LongPollingRunnable implements Runnable {
    private int taskId;

    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }

    public void run() {
        List<CacheData> cacheDatas = new ArrayList();
        ArrayList inInitializingCacheList = new ArrayList();

        try {
            //根据taskID拿到是不是我需要去监听的配置
            Iterator var3 = ((Map)ClientWorker.this.cacheMap.get()).values().iterator();

            while(var3.hasNext()) {
                CacheData cacheData = (CacheData)var3.next();
                if (cacheData.getTaskId() == this.taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        //通过本地文件中缓存的数据和 cacheData集合中的数据进行比对,判断是否出现数据变化
                        ClientWorker.this.checkLocalConfig(cacheData);
                       //没有变化直接返回,如果有变化,需要通知监听器
                        if (cacheData.isUseLocalConfigInfo()) {
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception var13) {
                        ClientWorker.LOGGER.error("get local config info error", var13);
                    }
                }
            }

            //通过长轮询机制,默认长轮询30s,如果30s内有配置更改返回,没有更改就30s后返回空
            List<String> changedGroupKeys = ClientWorker.this.checkUpdateDataIds(cacheDatas, inInitializingCacheList);
            ClientWorker.LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
            Iterator var16 = changedGroupKeys.iterator();

            while(var16.hasNext()) {
                String groupKey = (String)var16.next();
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }

                try {
                    //通过检测到改动过的key去重新从配置中心拿取最新的值
                    String[] ct = ClientWorker.this.getServerConfig(dataId, group, tenant, 3000L);
                    //更新本地缓存
                    CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(ct[0]);
                    if (null != ct[1]) {
                        cache.setType(ct[1]);
                    }

                    ClientWorker.LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", new Object[]{ClientWorker.this.agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]});
                } catch (NacosException var12) {
                    String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", ClientWorker.this.agent.getName(), dataId, group, tenant);
                    ClientWorker.LOGGER.error(message, var12);
                }
            }

            var16 = cacheDatas.iterator();

            while(true) {
                CacheData cacheDatax;
                do {
                    if (!var16.hasNext()) {
                        inInitializingCacheList.clear();
                        ClientWorker.this.executorService.execute(this);
                        return;
                    }

                    cacheDatax = (CacheData)var16.next();
                } while(cacheDatax.isInitializing() && !inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId, cacheDatax.group, cacheDatax.tenant)));

                cacheDatax.checkListenerMd5();
                cacheDatax.setInitializing(false);
            }
        } catch (Throwable var14) {
            ClientWorker.LOGGER.error("longPolling error : ", var14);
            ClientWorker.this.executorService.schedule(this, (long)ClientWorker.this.taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }
}

checkLocalConfig检查本地配置三种情况:

  • 如果isUseLocalConfifigInfo为false,但是本地缓存路径的文件是存在的,那么把isUseLocalConfifigInfo设置为true,并且更新cacheData的内容以及文件的更新时间
  • 如果isUseLocalCOnfifigInfo为true,但是本地缓存文件不存在,则设置为false,不通知监听器
  • isUseLocalConfifigInfo为true,并且本地缓存文件也存在,但是缓存的的时间和文件的更新时间不一致,则更新cacheData中的内容,并isUseLocalConfifigInfo设置为true

8.检查是否有配置改动

List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
    StringBuilder sb = new StringBuilder();
    Iterator var4 = cacheDatas.iterator();

    while(var4.hasNext()) {
        CacheData cacheData = (CacheData)var4.next();
        if (!cacheData.isUseLocalConfigInfo()) {
            sb.append(cacheData.dataId).append(Constants.WORD_SEPARATOR);
            sb.append(cacheData.group).append(Constants.WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(Constants.LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(Constants.WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(Constants.LINE_SEPARATOR);
            }

            if (cacheData.isInitializing()) {
                inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
            }
        }
    }

    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
    //远程服务检测,如果有改动返回,如果没改动,就等待超时返回
    return this.checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/76710.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!