Eureka集群项目介绍
图中表示Eureka集群的整个动作流程。集群内有3个节点、3个应用服务,服务列表使用的是双层Map,第一层key—>服务名,第二层key->实例名,value是实例信息(ip、port、url等)
@EnableEurekaServer 自动配置
第一步:@Bean Marker对象来激活EurekaServerMarkerConfiguration配置类
我们进入到@EnableEurekaServer
注解中看看:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
关键代码在@Import(EurekaServerMarkerConfiguration.class)
,来看下源码:
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
@Bean
public Marker eurekaServerMarkerBean() {
return new Marker();
}
class Marker {
}
}
此时,spring下下文中就有了Marker的实例
第二步:自动配置EurekaServerConfiguration类
在spring-cloud-netflix-eureka-server/META-INF/spring.factories
中配置了一个自动配置的类EurekaServerAutoConfiguration,而想要激活必须要满足条件@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
,也就是必须要有Marker实例,这一个已在上一步完成
配置代码
spring.factories:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
EurekaServerAutoConfiguration 配置类:
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
...
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
在EurekaServerAutoConfiguration
类中都是注册各种Bean,在类上还有一个关键注解@Import(EurekaServerInitializerConfiguration.class)
,代码如下:
@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {
private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);
// 服务端配置
@Autowired
private EurekaServerConfig eurekaServerConfig;
private ServletContext servletContext;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private EurekaServerBootstrap eurekaServerBootstrap;
private boolean running;
private int order = 1;
@Override
public void setServletContext(ServletContext servletContext) {
this.servletContext = servletContext;
}
@Override
public void start() {
new Thread(() -> {
try {
// 上下文初始化
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
// EurekaServer配置
private EurekaServerConfig getEurekaServerConfig() {
return this.eurekaServerConfig;
}
private void publish(ApplicationEvent event) {
this.applicationContext.publishEvent(event);
}
@Override
public void stop() {
this.running = false;
eurekaServerBootstrap.contextDestroyed(this.servletContext);
}
@Override
public boolean isRunning() {
return this.running;
}
// 获取阶段
@Override
public int getPhase() {
return 0;
}
// 是否自动启动,这里为true
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public int getOrder() {
return this.order;
}
}
实现的是一个SmartLifecycle
,它又继承Lifecycle
接口,主要方法在start()
,使用一个线程进行启动EurekaServer
第三步:谁来调用EurekaServerInitializerConfiguration.start()方法?
在spring初始化上下文中,主要的方法在AbstractApplicationContext.refresh()
方法来实现各种依赖注入,其中包括生命中周期
public abstract class AbstractApplicationContext extends DefaultResourceLoader
implements ConfigurableApplicationContext {
@Override
public void refresh() throws BeansException, IllegalStateException {
...
finishRefresh();
}
protected void finishRefresh() {
...
// 将刷新传播到各个生命周期处理器
getLifecycleProcessor().onRefresh();
...
}
}
LifecycleProcessor
是一个接口,默认的实现是DefaultLifecycleProcessor
类,实现了OnRefresh()
方法
@Override
public void onRefresh() {
startBeans(true);
this.running = true;
}
private void startBeans(boolean autoStartupOnly) {
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
Map<Integer, LifecycleGroup> phases = new TreeMap<>();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int phase = getPhase(bean);
phases.computeIfAbsent(
phase,
p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly)
).add(beanName, bean);
}
});
if (!phases.isEmpty()) {
phases.values().forEach(LifecycleGroup::start);
}
}
startBeans()方法大致过程如下:
- 获取所有实现Lifecycle接口的Bean
- 筛选实例为SmartLifecycle且isAutoStartup()方法返回true
- 根据getPhase()方法进行分组保存到TreeMap中,key为phase,value为:LifecycleGroup类
- 遍历执行map中每个LifecycleGroup对象的start()方法
- 最终会调用
bean.start()
方法,也就是EurekaServerInitializerConfiguration.start()
到这里,@EnableEurekaServer
基本就完成了它的使命,调用了EurekaServer初始化的代码。
EurekaServer 启动过程
第一步:EurekaServerInitializerConfiguration.start() 方法
@Override
public void start() {
new Thread(() -> {
try {
// note 上下文初始化
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
// 发送可用事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
// 发送启动事件
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
关键代码是contextInitialized()
方法,EurekaServerBootstrap代码如下:
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
// note 初始化EurekaServer上下文
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
// 针对AWS的处理逻辑
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry,
this.applicationInfoManager);
this.awsBinder.start();
}
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// 从邻近的eureka节点复制注册表
int registryCount = this.registry.syncUp();
// 服务剔除逻辑
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// 注册所有监控统计信息
EurekaMonitors.registerAllStats();
}
主要就是对邻近节点的服务列表进行备份、服务剔除逻辑、注册所有监控信息操作
第二步:this.registry.syncUp() 同步eureka节点服务列表
// 从邻近的DS节点复制整个条目
public int syncUp() {
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 获得所有节点的应用列表数据
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
// 如果可以注册则执行注册操作
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
第三步:服务剔除逻辑 this.registry.openForTraffic(this.applicationInfoManager, registryCount);
调用的是PeerAwareInstanceRegistry
类:
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// 更新每30秒发生一次
this.expectedNumberOfClientsSendingRenews = count;
// 期望最小每分钟续租次数
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
// 针对AWS的处理逻辑
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// 将EurekaSserver实例状态设置为UP
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 开启EvictionTask()的schedule计划任务,默认每60秒执行一次剔除操作
super.postInit();
}
定时检测服务剔除操作task(EvictionTask):
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
...
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
// task:剔除检测任务
// delay:首次执行delay
// period:执行周期,默认60秒,可以配置
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
}
任务剔除操作EvictionTask代码:
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
// 执行服务剔除操作
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
...
}
// 服务剔除
public void evict(long additionalLeaseMs) {
...
// 超期的实例
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
// 双层Map,第一层key—>服务名,第二层key->实例名
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
// 服务剔除
internalCancel(appName, id, false);
}
}
}
protected boolean internalCancel(String appName, String id, boolean isReplication) {
read.lock();
try {
// 获取此服务名下所有的实例
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
// 移除实例
leaseToCancel = gMap.remove(id);
}
...
} finally {
read.unlock();
}
...
return true;
}
至此,Eureka 服务端源码解析完成。
@EnableEurekaClient 自动配置
在EurekaClient中,@EnableEurekaClient
注解是可选的,因为默认会被spring boot自动配置扫描并加载
spring-cloud-netflix-eureka-client/META-INF/spring.factories
:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration
org.springframework.boot.Bootstrapper=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapper
配置比服务端多,关键看EurekaClientAutoConfiguration
,代码如下:
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {
...
}
可以看到,如果要自动配置,必须满足三个condition,分别为:
- @ConditionalOnClass(EurekaClientConfig.class):必须要有EurekaClientConfig类
- @ConditionalOnProperty(value = “eureka.client.enabled”, matchIfMissing = true):就是EnableEurekaClient注解,不配置的话默认为true
- @ConditionalOnDiscoveryEnabled
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @ConditionalOnProperty( value = {"spring.cloud.discovery.enabled"}, matchIfMissing = true ) public @interface ConditionalOnDiscoveryEnabled { }
默认为true,可以不用配置
所以默认是所有的Condition都满足,会自动启动EurekaClient服务
总结
对Eureka中涉及到的自动配置原理进行解析,并对EurekaServer端的源码浅析,如果要更加深入可以自行查看源码
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/17829.html