微服务治理之Eureka–源码浅析

导读:本篇文章讲解 微服务治理之Eureka–源码浅析,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

Eureka集群项目介绍

在这里插入图片描述
图中表示Eureka集群的整个动作流程。集群内有3个节点、3个应用服务,服务列表使用的是双层Map,第一层key—>服务名,第二层key->实例名,value是实例信息(ip、port、url等)

@EnableEurekaServer 自动配置

第一步:@Bean Marker对象来激活EurekaServerMarkerConfiguration配置类

我们进入到@EnableEurekaServer注解中看看:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

关键代码在@Import(EurekaServerMarkerConfiguration.class),来看下源码:

@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
	@Bean
	public Marker eurekaServerMarkerBean() {
		return new Marker();
	}

	class Marker {
	}
}

此时,spring下下文中就有了Marker的实例

第二步:自动配置EurekaServerConfiguration类

spring-cloud-netflix-eureka-server/META-INF/spring.factories中配置了一个自动配置的类EurekaServerAutoConfiguration,而想要激活必须要满足条件@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class),也就是必须要有Marker实例,这一个已在上一步完成
配置代码
spring.factories:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

EurekaServerAutoConfiguration 配置类:

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
...
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {

EurekaServerAutoConfiguration类中都是注册各种Bean,在类上还有一个关键注解@Import(EurekaServerInitializerConfiguration.class),代码如下:

@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {

	private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);

	// 服务端配置
	@Autowired
	private EurekaServerConfig eurekaServerConfig;

	private ServletContext servletContext;

	@Autowired
	private ApplicationContext applicationContext;

	@Autowired
	private EurekaServerBootstrap eurekaServerBootstrap;

	private boolean running;

	private int order = 1;

	@Override
	public void setServletContext(ServletContext servletContext) {
		this.servletContext = servletContext;
	}

	@Override
	public void start() {
		new Thread(() -> {
			try {
				// 上下文初始化
				eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
				log.info("Started Eureka Server");

				publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
				EurekaServerInitializerConfiguration.this.running = true;
				publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
			}
			catch (Exception ex) {
				// Help!
				log.error("Could not initialize Eureka servlet context", ex);
			}
		}).start();
	}

	// EurekaServer配置
	private EurekaServerConfig getEurekaServerConfig() {
		return this.eurekaServerConfig;
	}

	private void publish(ApplicationEvent event) {
		this.applicationContext.publishEvent(event);
	}

	@Override
	public void stop() {
		this.running = false;
		eurekaServerBootstrap.contextDestroyed(this.servletContext);
	}

	@Override
	public boolean isRunning() {
		return this.running;
	}

	// 获取阶段
	@Override
	public int getPhase() {
		return 0;
	}

	// 是否自动启动,这里为true
	@Override
	public boolean isAutoStartup() {
		return true;
	}

	@Override
	public int getOrder() {
		return this.order;
	}

}

实现的是一个SmartLifecycle,它又继承Lifecycle接口,主要方法在start(),使用一个线程进行启动EurekaServer

第三步:谁来调用EurekaServerInitializerConfiguration.start()方法?

在spring初始化上下文中,主要的方法在AbstractApplicationContext.refresh()方法来实现各种依赖注入,其中包括生命中周期

public abstract class AbstractApplicationContext extends DefaultResourceLoader
		implements ConfigurableApplicationContext {
	@Override
	public void refresh() throws BeansException, IllegalStateException {
		...
		finishRefresh();
	}
	protected void finishRefresh() {
		...
		// 将刷新传播到各个生命周期处理器
		getLifecycleProcessor().onRefresh();
		...
	}
}

LifecycleProcessor是一个接口,默认的实现是DefaultLifecycleProcessor类,实现了OnRefresh()方法

	@Override
	public void onRefresh() {
		startBeans(true);
		this.running = true;
	}
	private void startBeans(boolean autoStartupOnly) {
		Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
		Map<Integer, LifecycleGroup> phases = new TreeMap<>();

		lifecycleBeans.forEach((beanName, bean) -> {
			if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
				int phase = getPhase(bean);
				phases.computeIfAbsent(
						phase,
						p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly)
				).add(beanName, bean);
			}
		});
		if (!phases.isEmpty()) {
			phases.values().forEach(LifecycleGroup::start);
		}
	}

startBeans()方法大致过程如下:

  1. 获取所有实现Lifecycle接口的Bean
  2. 筛选实例为SmartLifecycle且isAutoStartup()方法返回true
  3. 根据getPhase()方法进行分组保存到TreeMap中,key为phase,value为:LifecycleGroup类
  4. 遍历执行map中每个LifecycleGroup对象的start()方法
  5. 最终会调用bean.start()方法,也就是EurekaServerInitializerConfiguration.start()

到这里,@EnableEurekaServer基本就完成了它的使命,调用了EurekaServer初始化的代码。

EurekaServer 启动过程

第一步:EurekaServerInitializerConfiguration.start() 方法

	@Override
	public void start() {
		new Thread(() -> {
			try {
				// note 上下文初始化
				eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
				log.info("Started Eureka Server");
				// 发送可用事件
				publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
				EurekaServerInitializerConfiguration.this.running = true;
				// 发送启动事件
				publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
			}
			catch (Exception ex) {
				// Help!
				log.error("Could not initialize Eureka servlet context", ex);
			}
		}).start();
	}

关键代码是contextInitialized()方法,EurekaServerBootstrap代码如下:

	public void contextInitialized(ServletContext context) {
		try {
			initEurekaEnvironment();
			// note 初始化EurekaServer上下文
			initEurekaServerContext();

			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}

	protected void initEurekaServerContext() throws Exception {
		// For backward compatibility
		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);

		// 针对AWS的处理逻辑
		if (isAws(this.applicationInfoManager.getInfo())) {
			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry,
					this.applicationInfoManager);
			this.awsBinder.start();
		}

		EurekaServerContextHolder.initialize(this.serverContext);

		log.info("Initialized server context");

		// 从邻近的eureka节点复制注册表
		int registryCount = this.registry.syncUp();
		// 服务剔除逻辑
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);

		// 注册所有监控统计信息
		EurekaMonitors.registerAllStats();
	}

主要就是对邻近节点的服务列表进行备份、服务剔除逻辑、注册所有监控信息操作

第二步:this.registry.syncUp() 同步eureka节点服务列表

	// 从邻近的DS节点复制整个条目
    public int syncUp() {
        
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            // 获得所有节点的应用列表数据
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                    	// 如果可以注册则执行注册操作
                        if (isRegisterable(instance)) {
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

第三步:服务剔除逻辑 this.registry.openForTraffic(this.applicationInfoManager, registryCount);

调用的是PeerAwareInstanceRegistry类:

    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // 更新每30秒发生一次
        this.expectedNumberOfClientsSendingRenews = count;
        // 期望最小每分钟续租次数
        updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        // 针对AWS的处理逻辑
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        // 将EurekaSserver实例状态设置为UP
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        // 开启EvictionTask()的schedule计划任务,默认每60秒执行一次剔除操作
        super.postInit();
    }

定时检测服务剔除操作task(EvictionTask):

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
	...
    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        // task:剔除检测任务
        // delay:首次执行delay
        // period:执行周期,默认60秒,可以配置
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }
}

任务剔除操作EvictionTask代码:

	class EvictionTask extends TimerTask {

        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

        @Override
        public void run() {
            try {
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                // 执行服务剔除操作
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
		...
    }
	// 服务剔除
    public void evict(long additionalLeaseMs) {
        ...
        // 超期的实例
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        // 双层Map,第一层key—>服务名,第二层key->实例名
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                // 服务剔除
                internalCancel(appName, id, false);
            }
        }
    }

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        read.lock();
        try {
            // 获取此服务名下所有的实例
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
            	// 移除实例
                leaseToCancel = gMap.remove(id);
            }
            ...
        } finally {
            read.unlock();
        }
		...
        return true;
    }

至此,Eureka 服务端源码解析完成。

@EnableEurekaClient 自动配置

在EurekaClient中,@EnableEurekaClient注解是可选的,因为默认会被spring boot自动配置扫描并加载
spring-cloud-netflix-eureka-client/META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration

org.springframework.boot.Bootstrapper=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapper

配置比服务端多,关键看EurekaClientAutoConfiguration,代码如下:

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
		"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
		"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
		"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {
	...
}

可以看到,如果要自动配置,必须满足三个condition,分别为:

  1. @ConditionalOnClass(EurekaClientConfig.class):必须要有EurekaClientConfig类
  2. @ConditionalOnProperty(value = “eureka.client.enabled”, matchIfMissing = true):就是EnableEurekaClient注解,不配置的话默认为true
  3. @ConditionalOnDiscoveryEnabled
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @ConditionalOnProperty(
        value = {"spring.cloud.discovery.enabled"},
        matchIfMissing = true
    )
    public @interface ConditionalOnDiscoveryEnabled {
    }
    

    默认为true,可以不用配置

所以默认是所有的Condition都满足,会自动启动EurekaClient服务

总结

对Eureka中涉及到的自动配置原理进行解析,并对EurekaServer端的源码浅析,如果要更加深入可以自行查看源码

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/17829.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!