SpringCloud源码学习笔记之Eureka服务端初始化

导读:本篇文章讲解 SpringCloud源码学习笔记之Eureka服务端初始化,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1、Eureka服务端初始化流程

  1. 启动类上的@EnableEurekaServer注解,通过@Import引入了EurekaServerMarkerConfiguration配置类,然后该配置了又通过@Bean注入了Marker实例。
  2. SpringBoot加载eureka-server包下META-INF/spring.factories文件,这里主要加载了org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration配置类。
  3. 根据@ConditionalOnBean判断Marker实例是否已经在Spring容器中,然后决定是否初始化EurekaServerAutoConfiguration类。
  4. 初始化EurekaServerAutoConfiguration类时,又通过@Import引入了EurekaServerInitializerConfiguration类,这类就是Eureka服务初始化的入口。
  5. 因为EurekaServerInitializerConfiguration类实现了SmartLifecycle接口,所以会执行其中的start()方法。
  6. 在上面的start()方法中,会调用EurekaServerBootstrap的contextInitialized()方法进行上下文环境的初始化。
  7. 在contextInitialized()方法中又调用了initEurekaEnvironment()和initEurekaServerContext()方法,分别用来初始化环境和Eureka服务上下文。
  8. 调用initEurekaServerContext()方法初始化上下文时,通过调用registry.syncUp()方法,实现服务端集群节点间的数据同步
  9. 同时,又调用了registry.openForTraffic()方法,开启定时进行服务剔除的任务。
  10. 最后,调用EurekaMonitors.registerAllStats()方法,注册监听数据。
  11. 在初始化EurekaServerBootstrap实例时,通过构造函数注入了EurekaServerContext实例对象,实际实现类是DefaultEurekaServerContext,而DefaultEurekaServerContext再实例化完成后,会调用被@PostConstruct注解注释的initialize()方法
  12. 在initialize()方法中,首先会调用peerEurekaNodes.start()方法,该方法会创建一个定时任务线程,通过调用updatePeerEurekaNodes()方法,来更新集群中的节点信息,可能涉及无效节点的移除和新节点的创建。
  13. 在initialize()方法中,还会调用registry.init()方法,该方法中又分别调用了initializedResponseCache()方法,用来创建缓存对象ResponseCacheImpl;调用scheduleRenewalThresholdUpdateTask()方法,创建定时任务,定期刷新续约阈值,实际逻辑通过调用updateRenewalThreshold()方法实现;最后又调用了initRemoteRegionRegistry()方法,初始化远程注册器。

2、@EnableEurekaServer注解

  用于启用Eureka服务(Eureka注册中心)的配置。在这里通过@Import注解引入了EurekaServerMarkerConfiguration配置类。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

  在EurekaServerMarkerConfiguration配置类中,主要把一个Marker实例注入到了Spring容器中,后续加载EurekaServerAutoConfiguration配置类时会作为判断条件使用。

@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
	@Bean
	public Marker eurekaServerMarkerBean() {
		return new Marker();
	}
	class Marker {
	}
}

3、EurekaServerAutoConfiguration配置类

  EurekaServerAutoConfiguration配置类是通过SpringBoot自动装配机制进行加载的。在eureka-server包下META-INF/spring.factories文件中有其相关配置,如下所示,在SpringBoot进行初始化的时候,会加载EurekaServerAutoConfiguration 配置类。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
	org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration	

  EurekaServerAutoConfiguration配置类的注解,如下:

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {

}

  首先,通过@Import注解,加载EurekaServerInitializerConfiguration配置类,该类是进行初始化的入口。

  然后,通过@ConditionalOnBean调节判断是否加载了Marker标识,即通过@EnableEurekaServer注解加载的标识Marker实例,只有Marker实例存在,该配置类才会进行加载。

  @EnableConfigurationProperties,用于启用对应的配置文件对应的类,这里主要是EurekaDashboardProperties和InstanceRegistryProperties。

  @PropertySource,提供一些属性配置到环境变量。

  同时,EurekaServerAutoConfiguration类实现了WebMvcConfigurer接口,增加了对Spring MVC的自定义配置。

4、EurekaServerInitializerConfiguration类

  在EurekaServerAutoConfiguration配置类中通过@Import注解引入,该配置类是Eureka服务进行初始化的入口。

  EurekaServerInitializerConfiguration类实现了SmartLifecycle接口和ServletContextAware接口。其中,实现ServletContextAware接口,为了在初始化时注入ServletContext对象;而实现SmartLifecycle接口,扩展了该类的生命周期方法,可以在ApplicationContext进行刷新或关闭时执行相关操作。

  关于SmartLifecycle接口的用法请参考《Spring SmartLifecycle 在容器所有bean加载和初始化完毕执行》

@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration
		implements ServletContextAware, SmartLifecycle, Ordered {
	
	@Override
	public void setServletContext(ServletContext servletContext) {
		this.servletContext = servletContext;
	}
	
}

  在start()方法中,通过调用EurekaServerBootstrap实例的contextInitialized()进行初始化,其中需要用到ServletContext实例,该实例通过实现ServletContextAware接口被注入。最后在通过ApplicationContext的publishEvent()方法,发布EurekaRegistryAvailableEvent和EurekaServerStartedEvent两个事件,并把正在运行标志running变量设置成true。

@Override
public void start() {
	new Thread(() -> {
		try {
			// TODO: is this class even needed now?
			eurekaServerBootstrap.contextInitialized(
					EurekaServerInitializerConfiguration.this.servletContext);
			log.info("Started Eureka Server");

			publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
			EurekaServerInitializerConfiguration.this.running = true;
			publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
		}
		catch (Exception ex) {
			// Help!
			log.error("Could not initialize Eureka servlet context", ex);
		}
	}).start();
}

  在stop()方法中,通过eurekaServerBootstrap的contextDestroyed()方法实现Eureka服务的停止。

@Override
public void stop() {
	this.running = false;
	eurekaServerBootstrap.contextDestroyed(this.servletContext);
}

5、EurekaServerBootstrap类

  EurekaServerBootstrap类主要用于Eureka服务的初始化、启动和停止等工作。在前面EurekaServerInitializerConfiguration类中的start()方法就是调用了EurekaServerBootstrap类的contextInitialized()方法。

5.1、contextInitialized()方法

  主要实现Eureka服务的初始化,实际由initEurekaEnvironment()和initEurekaServerContext()两个方法完成,代码如下:

public void contextInitialized(ServletContext context) {
	try {
		initEurekaEnvironment();
		initEurekaServerContext();

		context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
	}
	catch (Throwable e) {
		log.error("Cannot bootstrap eureka server :", e);
		throw new RuntimeException("Cannot bootstrap eureka server :", e);
	}
}

  在initEurekaEnvironment()方法,主要配置了archaius.deployment.datacenter和archaius.deployment.environment相关参数。

protected void initEurekaEnvironment() throws Exception {
	log.info("Setting the eureka configuration..");

	String dataCenter = ConfigurationManager.getConfigInstance()
			.getString(EUREKA_DATACENTER);
	if (dataCenter == null) {
		log.info(
				"Eureka data center value eureka.datacenter is not set, defaulting to default");
		ConfigurationManager.getConfigInstance()
				.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
	}
	else {
		ConfigurationManager.getConfigInstance()
				.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
	}
	String environment = ConfigurationManager.getConfigInstance()
			.getString(EUREKA_ENVIRONMENT);
	if (environment == null) {
		ConfigurationManager.getConfigInstance()
				.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
		log.info(
				"Eureka environment value eureka.environment is not set, defaulting to test");
	}
	else {
		ConfigurationManager.getConfigInstance()
				.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
	}
}

  在initEurekaServerContext()方法中,主要初始化Eureka服务节点的信息,包括了集群中节点信息同步、服务剔除等,最后还注册了统计监控器。

protected void initEurekaServerContext() throws Exception {
		// For backward compatibility
		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);
		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);

		if (isAws(this.applicationInfoManager.getInfo())) {
			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
			this.awsBinder.start();
		}
		//保存上下文
		EurekaServerContextHolder.initialize(this.serverContext);
		log.info("Initialized server context");
		// 同步相邻节点上的注册信息
		int registryCount = this.registry.syncUp();
		//更新续约信息,并启动服务剔除的定时任务
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);
		// 注册监控工具.
		EurekaMonitors.registerAllStats();
	}

  在initEurekaServerContext()方法中,首先调用了EurekaServerContextHolder的initialize()方法,在该方法中,通过创建EurekaServerContextHolder对象,然后把EurekaServerContext 上下文实例赋值给该对象的一个属性,方便EurekaServerContext 上下文在其他地方的使用。具体实现如下:

public static synchronized void initialize(EurekaServerContext serverContext) {
 	holder = new EurekaServerContextHolder(serverContext);
}

  然后,调用了registry.syncUp()方法,实际上是调用了在PeerAwareInstanceRegistryImpl类中实现的syncUp()方法,主要用来同步相邻节点的注册信息。

@Override
public int syncUp() {
    // 表示从相邻节点同步的数据量
    int count = 0;
	//根据重试次数进行,serverConfig.getRegistrySyncRetries()默认为0,所以初初始化时没有执行。count 当不为0时,表示已经同步到数据了,不再重复执行。
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
    	//每次重试,都会休眠30s(默认时间)
        if (i > 0) {
            try {
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        //遍历所有的InstanceInfo,把可注册到当前region的InstanceInfo对象进行注册。register()用来注册InstanceInfo,我们后续专门学习。
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

  之后,又调用了registry的openForTraffic()方法,该方法主要用来更新续约数,开启剔除定时任务。在该方法调用过程中,首先调用的是InstanceRegistry类的方法,然后又调用了父类PeerAwareInstanceRegistryImpl的openForTraffic()方法。具体实现如下:

  InstanceRegistry类的openForTraffic()方法

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
	super.openForTraffic(applicationInfoManager,
			count == 0 ? this.defaultOpenForTrafficCount : count);
}

  PeerAwareInstanceRegistryImpl类的openForTraffic()方法

 @Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
	   // 客户端预期应该发送的续约次数(需要乘以因子,即每分钟发送的续约次数)
	   this.expectedNumberOfClientsSendingRenews = count;
	   //更新阈值
	   updateRenewsPerMinThreshold();
	   logger.info("Got {} instances from neighboring DS node", count);
	   logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
	   this.startupTime = System.currentTimeMillis();
	   //当count > 0,表示Eureka服务有数据需要从相邻节点同步,所以这时不允许被 Eureka客户端获取注册信息
	   if (count > 0) {
	       this.peerInstancesTransferEmptyOnStartup = false;
	   }
	   //使用Aws时需要的判断
	   DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
	   boolean isAws = Name.Amazon == selfName;
	   if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
	       logger.info("Priming AWS connections for all replicas..");
	       primeAwsReplicas(applicationInfoManager);
	   }
	   logger.info("Changing status to UP");
	   //设置状态
	   applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
	   //开启周期性服务剔除任务
	   super.postInit();
}

  在上述的openForTraffic()方法中,最后又调用了 super.postInit()方法,实际上是调用了AbstractInstanceRegistry类中的postInit()方法,该方法主要实现了周期性的执行服务剔除任务EvictionTask,主要逻辑如下:

protected void postInit() {
 	renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    //周期性(默认每60s)执行一次服务剔除任务EvictionTask,关于服务剔除的逻辑后续专门章节学习。
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}
5.2、contextDestroyed()方法

  该方法主要用来关闭Eureka服务器。主要做了三件事:首先,从容器上下文中,移除了Eureka上下文信息;然后,通过destroyEurekaServerContext()方法关闭Eureka上下文,同时会关闭监控工具等;最后,通过destroyEurekaEnvironment()方法清除Eureka环境信息,该方法一般由开发者实现。

public void contextDestroyed(ServletContext context) {
	try {
		log.info("Shutting down Eureka Server..");
		context.removeAttribute(EurekaServerContext.class.getName());

		destroyEurekaServerContext();
		destroyEurekaEnvironment();

	}
	catch (Throwable e) {
		log.error("Error shutting down eureka", e);
	}
	log.info("Eureka Service is now shutdown...");
}

6、EurekaServerContext

  在EurekaServerAutoConfiguration配置类中,通过@Bean注解装配了EurekaServerContext对象,实际上是装配了DefaultEurekaServerContext实现类的对象。在该类初始化完成后,会调用被@PostConstruct注解注释的initialize()方法。EurekaServerContext对象,后续会在初始化构建EurekaServerBootstrap对象时,作为构造函数参数传入进去,不过需要注意的是initialize()方法执行,是在构建EurekaServerBootstrap对象之前进行的。具体实现如下:

6.1、EurekaServerContext对象的initialize()方法
@PostConstruct
@Override
public void initialize() {
    logger.info("Initializing ...");
    peerEurekaNodes.start();
    try {
        registry.init(peerEurekaNodes);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    logger.info("Initialized");
}

  该方法被注解@PostConstruct修饰,所以在EurekaServerContext上下文对象初始化完成后,会被调用。

  然后,该方法中又会调用peerEurekaNodes的start()方法和registry的init()方法。

6.1.1、peerEurekaNodes的start()方法

  这里的PeerEurekaNodes对象实际上是在EurekaServerAutoConfiguration配置类中定义的内部类RefreshablePeerEurekaNodes,该对象也是在这个配置类中通过@Bean注解注入的,不过start()方法还是在父类PeerEurekaNodes中定义的,如下:

public void start() {
	//创建一个单一(串行)的定时执行任务的线程
   taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                	//创建线程,并命名
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    //设置守护线程
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    try {
    	//更新集群中的Eureka服务节点,涉及新建和移除节点
        updatePeerEurekaNodes(resolvePeerUrls());
        //创建更新集群中的Eureka服务节点的线程任务
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }

            }
        };
        //执行周期性任务,更新集群中的Eureka服务节点(默认十分钟执行一次)。
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  {}", node.getServiceUrl());
    }
}

  updatePeerEurekaNodes()方法主要用来更新集群中的Eureka服务节点,其中会涉及到不可用节点的移除和可用节点的新建。

protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
   if (newPeerUrls.isEmpty()) {
       logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
       return;
   }

   Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
   //用来记录需要停用的Eureka服务的URL,求toShutdown与newPeerUrls的差集,即toShutdown中有而newPeerUrls中没有的元素。初始情况下,该集合为空。
   toShutdown.removeAll(newPeerUrls);
   Set<String> toAdd = new HashSet<>(newPeerUrls);
   //用来记录需要新建的Eureka服务的URL,求toAdd与peerEurekaNodeUrls的差集,即toAdd中有而peerEurekaNodeUrls中没有的元素。初始情况下,该集合为newPeerUrls集合。
   toAdd.removeAll(peerEurekaNodeUrls);
	//同时为空,即没有变化,直接返回。
   if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
       return;
   }

   // 当前的所有服务节点
   List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
	//如果toShutdown不为null,则移除对应的节点
   if (!toShutdown.isEmpty()) {
       logger.info("Removing no longer available peer nodes {}", toShutdown);
       int i = 0;
       while (i < newNodeList.size()) {
           PeerEurekaNode eurekaNode = newNodeList.get(i);
           if (toShutdown.contains(eurekaNode.getServiceUrl())) {
               newNodeList.remove(i);
               eurekaNode.shutDown();
           } else {
               i++;
           }
       }
   }

   // 新增节点
   if (!toAdd.isEmpty()) {
       logger.info("Adding new peer nodes {}", toAdd);
       for (String peerUrl : toAdd) {
           newNodeList.add(createPeerEurekaNode(peerUrl));
       }
   }
	//重新赋值两个全局变量
   this.peerEurekaNodes = newNodeList;
   this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
6.1.2、registry的init()方法

  这里的registry实例,也是在EurekaServerAutoConfiguration配置类中通过@Bean注入的InstanceRegistry实例。不过这里调用的init()方法是在父类PeerAwareInstanceRegistryImpl中实现的。该方法主要实现了服务端要初始化的逻辑,具体如下:

@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    this.numberOfReplicationsLastMin.start();
    this.peerEurekaNodes = peerEurekaNodes;
    //缓存服务端的注册信息,用于客户端的请求
    initializedResponseCache();
    //定时更新阈值(默认十五分钟一次)
    scheduleRenewalThresholdUpdateTask();
    //用于初始化处理跨Region的注册信息的RemoteRegionRegistry
    initRemoteRegionRegistry();

    try {
    	//创建监听器
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
    }
}

  该方法涉及到的方法,我们在后面学习Eureka服务注册、续约、服务剔除等内容时再详细分析。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/68767.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!