1、Eureka服务端初始化流程
- 启动类上的@EnableEurekaServer注解,通过@Import引入了EurekaServerMarkerConfiguration配置类,然后该配置了又通过@Bean注入了Marker实例。
- SpringBoot加载eureka-server包下META-INF/spring.factories文件,这里主要加载了org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration配置类。
- 根据@ConditionalOnBean判断Marker实例是否已经在Spring容器中,然后决定是否初始化EurekaServerAutoConfiguration类。
- 初始化EurekaServerAutoConfiguration类时,又通过@Import引入了EurekaServerInitializerConfiguration类,这类就是Eureka服务初始化的入口。
- 因为EurekaServerInitializerConfiguration类实现了SmartLifecycle接口,所以会执行其中的start()方法。
- 在上面的start()方法中,会调用EurekaServerBootstrap的contextInitialized()方法进行上下文环境的初始化。
- 在contextInitialized()方法中又调用了initEurekaEnvironment()和initEurekaServerContext()方法,分别用来初始化环境和Eureka服务上下文。
- 调用initEurekaServerContext()方法初始化上下文时,通过调用registry.syncUp()方法,实现服务端集群节点间的数据同步
- 同时,又调用了registry.openForTraffic()方法,开启定时进行服务剔除的任务。
- 最后,调用EurekaMonitors.registerAllStats()方法,注册监听数据。
- 在初始化EurekaServerBootstrap实例时,通过构造函数注入了EurekaServerContext实例对象,实际实现类是DefaultEurekaServerContext,而DefaultEurekaServerContext再实例化完成后,会调用被@PostConstruct注解注释的initialize()方法
- 在initialize()方法中,首先会调用peerEurekaNodes.start()方法,该方法会创建一个定时任务线程,通过调用updatePeerEurekaNodes()方法,来更新集群中的节点信息,可能涉及无效节点的移除和新节点的创建。
- 在initialize()方法中,还会调用registry.init()方法,该方法中又分别调用了initializedResponseCache()方法,用来创建缓存对象ResponseCacheImpl;调用scheduleRenewalThresholdUpdateTask()方法,创建定时任务,定期刷新续约阈值,实际逻辑通过调用updateRenewalThreshold()方法实现;最后又调用了initRemoteRegionRegistry()方法,初始化远程注册器。
2、@EnableEurekaServer注解
用于启用Eureka服务(Eureka注册中心)的配置。在这里通过@Import注解引入了EurekaServerMarkerConfiguration配置类。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
在EurekaServerMarkerConfiguration配置类中,主要把一个Marker实例注入到了Spring容器中,后续加载EurekaServerAutoConfiguration配置类时会作为判断条件使用。
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
@Bean
public Marker eurekaServerMarkerBean() {
return new Marker();
}
class Marker {
}
}
3、EurekaServerAutoConfiguration配置类
EurekaServerAutoConfiguration配置类是通过SpringBoot自动装配机制进行加载的。在eureka-server包下META-INF/spring.factories文件中有其相关配置,如下所示,在SpringBoot进行初始化的时候,会加载EurekaServerAutoConfiguration 配置类。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
EurekaServerAutoConfiguration配置类的注解,如下:
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
}
首先,通过@Import注解,加载EurekaServerInitializerConfiguration配置类,该类是进行初始化的入口。
然后,通过@ConditionalOnBean调节判断是否加载了Marker标识,即通过@EnableEurekaServer注解加载的标识Marker实例,只有Marker实例存在,该配置类才会进行加载。
@EnableConfigurationProperties,用于启用对应的配置文件对应的类,这里主要是EurekaDashboardProperties和InstanceRegistryProperties。
@PropertySource,提供一些属性配置到环境变量。
同时,EurekaServerAutoConfiguration类实现了WebMvcConfigurer接口,增加了对Spring MVC的自定义配置。
4、EurekaServerInitializerConfiguration类
在EurekaServerAutoConfiguration配置类中通过@Import注解引入,该配置类是Eureka服务进行初始化的入口。
EurekaServerInitializerConfiguration类实现了SmartLifecycle接口和ServletContextAware接口。其中,实现ServletContextAware接口,为了在初始化时注入ServletContext对象;而实现SmartLifecycle接口,扩展了该类的生命周期方法,可以在ApplicationContext进行刷新或关闭时执行相关操作。
关于SmartLifecycle接口的用法请参考《Spring SmartLifecycle 在容器所有bean加载和初始化完毕执行》。
@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
@Override
public void setServletContext(ServletContext servletContext) {
this.servletContext = servletContext;
}
}
在start()方法中,通过调用EurekaServerBootstrap实例的contextInitialized()进行初始化,其中需要用到ServletContext实例,该实例通过实现ServletContextAware接口被注入。最后在通过ApplicationContext的publishEvent()方法,发布EurekaRegistryAvailableEvent和EurekaServerStartedEvent两个事件,并把正在运行标志running变量设置成true。
@Override
public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
在stop()方法中,通过eurekaServerBootstrap的contextDestroyed()方法实现Eureka服务的停止。
@Override
public void stop() {
this.running = false;
eurekaServerBootstrap.contextDestroyed(this.servletContext);
}
5、EurekaServerBootstrap类
EurekaServerBootstrap类主要用于Eureka服务的初始化、启动和停止等工作。在前面EurekaServerInitializerConfiguration类中的start()方法就是调用了EurekaServerBootstrap类的contextInitialized()方法。
5.1、contextInitialized()方法
主要实现Eureka服务的初始化,实际由initEurekaEnvironment()和initEurekaServerContext()两个方法完成,代码如下:
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
在initEurekaEnvironment()方法,主要配置了archaius.deployment.datacenter和archaius.deployment.environment相关参数。
protected void initEurekaEnvironment() throws Exception {
log.info("Setting the eureka configuration..");
String dataCenter = ConfigurationManager.getConfigInstance()
.getString(EUREKA_DATACENTER);
if (dataCenter == null) {
log.info(
"Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
String environment = ConfigurationManager.getConfigInstance()
.getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
log.info(
"Eureka environment value eureka.environment is not set, defaulting to test");
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
}
}
在initEurekaServerContext()方法中,主要初始化Eureka服务节点的信息,包括了集群中节点信息同步、服务剔除等,最后还注册了统计监控器。
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
//保存上下文
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// 同步相邻节点上的注册信息
int registryCount = this.registry.syncUp();
//更新续约信息,并启动服务剔除的定时任务
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// 注册监控工具.
EurekaMonitors.registerAllStats();
}
在initEurekaServerContext()方法中,首先调用了EurekaServerContextHolder的initialize()方法,在该方法中,通过创建EurekaServerContextHolder对象,然后把EurekaServerContext 上下文实例赋值给该对象的一个属性,方便EurekaServerContext 上下文在其他地方的使用。具体实现如下:
public static synchronized void initialize(EurekaServerContext serverContext) {
holder = new EurekaServerContextHolder(serverContext);
}
然后,调用了registry.syncUp()方法,实际上是调用了在PeerAwareInstanceRegistryImpl类中实现的syncUp()方法,主要用来同步相邻节点的注册信息。
@Override
public int syncUp() {
// 表示从相邻节点同步的数据量
int count = 0;
//根据重试次数进行,serverConfig.getRegistrySyncRetries()默认为0,所以初初始化时没有执行。count 当不为0时,表示已经同步到数据了,不再重复执行。
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
//每次重试,都会休眠30s(默认时间)
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
//遍历所有的InstanceInfo,把可注册到当前region的InstanceInfo对象进行注册。register()用来注册InstanceInfo,我们后续专门学习。
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
之后,又调用了registry的openForTraffic()方法,该方法主要用来更新续约数,开启剔除定时任务。在该方法调用过程中,首先调用的是InstanceRegistry类的方法,然后又调用了父类PeerAwareInstanceRegistryImpl的openForTraffic()方法。具体实现如下:
InstanceRegistry类的openForTraffic()方法
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
super.openForTraffic(applicationInfoManager,
count == 0 ? this.defaultOpenForTrafficCount : count);
}
PeerAwareInstanceRegistryImpl类的openForTraffic()方法
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// 客户端预期应该发送的续约次数(需要乘以因子,即每分钟发送的续约次数)
this.expectedNumberOfClientsSendingRenews = count;
//更新阈值
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
//当count > 0,表示Eureka服务有数据需要从相邻节点同步,所以这时不允许被 Eureka客户端获取注册信息
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
//使用Aws时需要的判断
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
//设置状态
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
//开启周期性服务剔除任务
super.postInit();
}
在上述的openForTraffic()方法中,最后又调用了 super.postInit()方法,实际上是调用了AbstractInstanceRegistry类中的postInit()方法,该方法主要实现了周期性的执行服务剔除任务EvictionTask,主要逻辑如下:
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
//周期性(默认每60s)执行一次服务剔除任务EvictionTask,关于服务剔除的逻辑后续专门章节学习。
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
5.2、contextDestroyed()方法
该方法主要用来关闭Eureka服务器。主要做了三件事:首先,从容器上下文中,移除了Eureka上下文信息;然后,通过destroyEurekaServerContext()方法关闭Eureka上下文,同时会关闭监控工具等;最后,通过destroyEurekaEnvironment()方法清除Eureka环境信息,该方法一般由开发者实现。
public void contextDestroyed(ServletContext context) {
try {
log.info("Shutting down Eureka Server..");
context.removeAttribute(EurekaServerContext.class.getName());
destroyEurekaServerContext();
destroyEurekaEnvironment();
}
catch (Throwable e) {
log.error("Error shutting down eureka", e);
}
log.info("Eureka Service is now shutdown...");
}
6、EurekaServerContext
在EurekaServerAutoConfiguration配置类中,通过@Bean注解装配了EurekaServerContext对象,实际上是装配了DefaultEurekaServerContext实现类的对象。在该类初始化完成后,会调用被@PostConstruct注解注释的initialize()方法。EurekaServerContext对象,后续会在初始化构建EurekaServerBootstrap对象时,作为构造函数参数传入进去,不过需要注意的是initialize()方法执行,是在构建EurekaServerBootstrap对象之前进行的。具体实现如下:
6.1、EurekaServerContext对象的initialize()方法
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
peerEurekaNodes.start();
try {
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
该方法被注解@PostConstruct修饰,所以在EurekaServerContext上下文对象初始化完成后,会被调用。
然后,该方法中又会调用peerEurekaNodes的start()方法和registry的init()方法。
6.1.1、peerEurekaNodes的start()方法
这里的PeerEurekaNodes对象实际上是在EurekaServerAutoConfiguration配置类中定义的内部类RefreshablePeerEurekaNodes,该对象也是在这个配置类中通过@Bean注解注入的,不过start()方法还是在父类PeerEurekaNodes中定义的,如下:
public void start() {
//创建一个单一(串行)的定时执行任务的线程
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
//创建线程,并命名
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
//设置守护线程
thread.setDaemon(true);
return thread;
}
}
);
try {
//更新集群中的Eureka服务节点,涉及新建和移除节点
updatePeerEurekaNodes(resolvePeerUrls());
//创建更新集群中的Eureka服务节点的线程任务
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
//执行周期性任务,更新集群中的Eureka服务节点(默认十分钟执行一次)。
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
updatePeerEurekaNodes()方法主要用来更新集群中的Eureka服务节点,其中会涉及到不可用节点的移除和可用节点的新建。
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
//用来记录需要停用的Eureka服务的URL,求toShutdown与newPeerUrls的差集,即toShutdown中有而newPeerUrls中没有的元素。初始情况下,该集合为空。
toShutdown.removeAll(newPeerUrls);
Set<String> toAdd = new HashSet<>(newPeerUrls);
//用来记录需要新建的Eureka服务的URL,求toAdd与peerEurekaNodeUrls的差集,即toAdd中有而peerEurekaNodeUrls中没有的元素。初始情况下,该集合为newPeerUrls集合。
toAdd.removeAll(peerEurekaNodeUrls);
//同时为空,即没有变化,直接返回。
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// 当前的所有服务节点
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
//如果toShutdown不为null,则移除对应的节点
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// 新增节点
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
//重新赋值两个全局变量
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
6.1.2、registry的init()方法
这里的registry实例,也是在EurekaServerAutoConfiguration配置类中通过@Bean注入的InstanceRegistry实例。不过这里调用的init()方法是在父类PeerAwareInstanceRegistryImpl中实现的。该方法主要实现了服务端要初始化的逻辑,具体如下:
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
//缓存服务端的注册信息,用于客户端的请求
initializedResponseCache();
//定时更新阈值(默认十五分钟一次)
scheduleRenewalThresholdUpdateTask();
//用于初始化处理跨Region的注册信息的RemoteRegionRegistry
initRemoteRegionRegistry();
try {
//创建监听器
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
该方法涉及到的方法,我们在后面学习Eureka服务注册、续约、服务剔除等内容时再详细分析。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/68767.html