是由Netflix开源的基于REST的服务治理组件,包含了eureka server 和eureka client。从2012年9月在GitHub上发布1.1.2版本以来,至今已经发布了231次,最新版本为2018年8月份发布的1.9.4版本。期间有进行2.x版本的开发,不过由于各种原因内部已经冻结开发,目前还是以1.x版本为主。Spring Cloud Netflix Eureka是Pivotal公司为了将Netflix Eureka整合于Spring Cloud生态系统提供的版本。
Eureka快速入门
Eureka Server
-
引入依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency>
-
主应用类上添加**@EnableEurekaServer**
-
配置eureka
eureka: client: register-with-eureka: true fetch-registry: true service-url: defaultZone: ${EUREKA_SERVER_ADDRESS:http://localhost:8761/eureka/} server: peer-node-read-timeout-ms: 5000
-
启动

Eureka Client
-
引入客户端依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
-
主应用类添加**@EnableDiscoveryClient**
-
配置
eureka: instance: prefer-ip-address: true client: service-url: defaultZone: ${EUREKA_SERVER_ADDRESS:http://localhost:8761/eureka/}
-
启动客户端
Eureka 高可用
application-peer1.yml
---
server:
port: 8761
eureka:
client:
register-with-eureka: true # 是否向 eureka server 注册
fetch-registry: true # 是否向 eureka server 获取其他客户端注册信息
service-url:
defaultZone: ${EUREKA_SERVER_ADDRESSES:http://localhost:8762/eureka/}
server:
registry-sync-retry-wait-ms: 5000
application-pee2.yml
---
server:
port: 8762
eureka:
client:
register-with-eureka: true # 是否向 eureka server 注册
fetch-registry: true # 是否向 eureka server 获取其他客户端注册信息
service-url:
defaultZone: ${EUREKA_SERVER_ADDRESSES:http://localhost:8761/eureka/}
server:
registry-sync-retry-wait-ms: 5000
客户端配置
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/
register-with-eureka: true
fetch-registry: true
启动两个server组成集群,客户配置seviceUrl为集群地址
- 在两个server都没有启动的情况下,启动client会报错,但还是能启动起来的
- 部分server启动的情况下,client是可以注册到集群的并拉取到服务列表
server采用点对点复制方式,高可用主要是在客户端实现的
- 在Client启动之前,如果没有Eureka Server,则可以通过配置eureka.client.backup-registry-impl从备份registry读取关键服务的信息。(一般不会设置,应该是搭建eureka server集群)
- 在client启动之后,若运行时出现Eureka Server全部挂掉的情况:本地内存有localRegion之前获取的数据,在Eureka Server都不可用的情况下,从Server端定时拉取注册信息回来更新的线程CacheRefreshThread会执行失败,本地localRegion信息不会被更新。
- Client端维护了一个Eureka Server的不可用列表,一旦请求发生Connection error或者5xx的情况则会被列入该列表,当该列表的大小超过指定阈值则会重新清空。在重新清空的情况下,Client默认是采用RetryableEurekaHttpClient进行请求,numberOfRetries为3,因此也能够在一定程度保障Client的高可用。
Eureka 设计原理
作为服务注册中心,主要解决以下问题:
-
客户端怎么向服务注册中心注册?
服务启动时,调用Eureka Server的REST API的register方法,去注册该应用实例的信息。
-
服务实例怎么从服务注册中心中剔除
正常情况下,服务在关闭时通过钩子方法或其他生命周期回调方法去调用Eureka Server的REST API的de-register方法,来删除自身服务实例的信息。为了解决异常情况下,服务没有及时从服务中剔除,Eureka Server要求客户端每隔一段时间续约,发送心跳。如果租约超过一定时间没有进行续约操作,Eureka Server端会主动剔除服务。
-
服务实例数据的一致性问题
服务注册中心微服务中重要的组件,在生产环境中必然是集群部署,两个服务注册中心上的服务列表数据怎么保持一致呢?
下面从四个方面来解释:
-
AP优于CP
对于分布式系统来说,一般网络条件相对不可控,出现网络分区是不可避免的,因此系统必须具备分区容忍性。在这个前提下分布式系统的设计则在AP及CP之间进行选择。Eureka是在部署在AWS的背景下设计的,其设计者认为,在云端,特别是在大规模部署的情况下,失败是不可避免的,可能因为Eureka自身部署失败,注册的服务不可用,或者由于网络分区导致服务不可用,因此不能回避这个问题。要拥抱这个问题,就需要Eureka在网络分区的时候,还能够正常提供服务注册及发现功能,因此Eureka选择满足Availability这个特性。在实际生产实践中,服务注册及发现中心保留可用及过期的数据总比丢失掉可用的数据好。
-
Peer to Peer
Eureka Server本身依赖了Eureka Client,也就是每个Eureka Server是作为其他Eureka Server的Client。在单个Eureka Server启动的时候,会有一个syncUp的操作,通过Eureka Client请求其他Eureka Server节点中的一个节点获取注册的应用实例信息,然后复制到其他peer节点。
Eureka Server在执行复制操作的时候,使用HEADER_REPLICATION的http header来将这个请求操作与普通应用实例的正常请求操作区分开来。通过HEADER_REPLICATION来标识是复制请求,这样其他peer节点接收到请求的时候,就不会再对它的peer节点进行复制操作,从而避免死循环。
-
Region及Zone设计
由于Netflix的服务大部分是部署在amazon上,因此Eureka的设计一部分也是基于zmazon的region及zone的基础设施上。
Region可以理解为地理上的分区,如亚洲地区、华北地区;Zone可以理解为region内的具体机房,比如说 region 划分为华北地区,然后华北地区有两个机房,就可以在此 region 之下划分出 zone1、zone2 两个 zone。服务最好是注册到同一zone的注册中心,因为如果不是同一zone,可能心跳检测有会问题。服务调用是优先调用同一zone的服务的。
Eureka Server原生支持了Region及AvailabilityZone,由于资源在Region之间默认是不会复制的,因此Eureka Server的高可用主要就在于Region下面的AvailabilityZone。
Eureka Client支持preferSameZone,也就是获取Eureka Server的serviceUrl优先拉取跟应用实例同处于一个AvailabilityZone的Eureka Server地址列表。一个AvailabilityZone可以设置多个Eureka Server实例,它们之间构成peer节点,然后采用Peer to Peer的复制模式。
-
自我保护模式
在分布式系统中通常是要对服务实例进行存活检测的,在发生网络抖动或暂时不可用时可能会误判, 极端情况可能造成注册中心清空服务列表。因此Eureka使用保护模式。
Eureka Client端与Server端之间有个租约,Client要定时发送心跳来维持这个租约,表示自己还存活着。Eureka通过当前注册的实例数,去计算每分钟应该从应用实例接收到的心跳数,如果最近一分钟接收到的续约的次数小于等于指定阈值的话,则关闭租约失效剔除,禁止定时任务剔除失效的实例,从而保护注册信息。
关闭自我保护模样:
eureka.server.enable-self-preservation=true
-
Eureka 核心类
InstanceInfo
Eureka使用InstanceInfo类代表注册的服务实例。其主要字段有:
字段 | 说明 |
---|---|
instanceId | 实例id |
appName | 应用名 |
ipAddr | 服务的ip地址 |
port | 端口 |
datacenterInfo | DataCenterInfo(Netflix, Amazon, MyOwn) |
leaseInfo | LeaseInfo租约信息 |
Status | InstanceStatus服务实例信息 |
lastUpdatedTimestamp | 上一次更新时间 |
lastDirtyTimestamp | 上一次失效时间 |
metadata | 元数据信息 |
LeaseInfo
Eureka使用LeaseInfo表示应用服务实例的租约信息,其主要字段有:
字段 | 说明 |
---|---|
serviceUpTimestamp | 服务启动的时间戳 |
registrationTimestamp | 服务注册的时间戳 |
lastRenewalTimestamp | 上一次续租时间戳 |
evictionTimestamp | 被剔除的时间戳 |
renewalIntervalInSecs | 客户端续租的间隔周期 |
durationInSecs | 客户端设置的租约的有效时长 |
InstanceStatus
Eureka使用InstanceStatus表示服务实例的状态。InstanceStatus是枚举类,枚举值有:
UP | 服务上线 |
---|---|
DOWN | 服务下线(心跳检测失败) |
STARTING | 服务启动中 |
OUT_OF_SERVICE | 服务关闭(服务自己下线) |
UNKNOWN | 未知 |
ServiceInstance
ServiceInstance是Spring Cloud是service discovery的实例的抽象接口,约定的服务的实例应该有哪些通用信息。由于Spring Cloud Discovery适配了Zookeeper、Consul、Netflix Eureka等注册中心,因此其ServiceInstance定义更为抽象和通用,而且采取的是定义方法的方式。EurekaRegistration实现了ServiceInstance接口。
LeaseManager
方法 | 作用 |
---|---|
void register(T r, int leaseDuration, boolean isReplication) | 注册新服务 |
boolean cancel(String appName, String id, boolean isReplication) | 删除服务 |
boolean renew(String appName, String id, boolean isReplication) | 服务续约 |
void evict() | 剔除过期的服务,server使用 |
LookupService
方法 | 作用 |
---|---|
Application getApplication(String appName); | 根据appName查找应用 |
Applications getApplications(); | 查询所有应用 |
List |
根据id查找服务实例 |
InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure); |
Eureka 核心参数
Server与Client共用参数
参数 | 默认值 | 说明 |
---|---|---|
eureka.instance.instanceId | eureka实例id | |
eureka.instance.preferIpAddress | flase | 是否优先使用ip来代替hostname作为实例的hostName字段的值 |
eureka.instance.metadataMap | eureka实例的元数据 | |
eureka.instance.leaseRenewalIntervalInSeconds | 30 | eureka client发送心跳的周期 |
eureka.instance.leaseExpirationDurationInSeconds | 90 | eureka server等待client续约的时间 |
更多参数见EurekaInstanceConfigBean
Server端(见EurekaServerConfigBean)
参数 | 默认值 | 说明 |
---|---|---|
eureka.server.enableSelfPreservation | True | 是否启用保护模式 |
eureka.server.renewalPercentThreshold | 0.85 | 指定需要收到续租的阀值 |
eureka.server.renewalThresholdUpdateIntervalMs | 15分钟 | 指定了renewalThresholdUpdate定时任务的调度频率 |
eureka.server.evictionIntervalTimerInMs | 60 * 1000 | 指定了EvictionTask的调度频率,用于剔除过期的服务 |
eureka.server.useReadOnlyResponseCache | True | 是否使用只读的response cache |
eureka.server.responseCacheUpdateIntervalMs | 30s | 设置CacheUpdateTask的调度时间间隔 |
eureka.server.responseCacheAutoExpirationInSeconds | readWriteCacheMapr的expireAfterWrite,指定写入多常时间过期 | |
eureka.server.peerEurekaNodesUpdateIntervalMs | 10分钟 | 指定PeerUpdateTask的时间间隔 |
eureka.server.peerEurekaStatusRefreshTimeIntervalMs | 30s | 指定更新peer状态的时间隔 |
eureka.server.peerNodeConnectTimeoutMs | 200ms | peer节点连接超时,单位ms |
eureka.server.peerNodeReadTimeoutMs | 200ms | peer节点读超时 |
eureka.server.peerNodeTotalConnections | 1000 | 连接池最大连接数 |
更多配置见于EurekaServerConfigBean
Client端
参数 | 默认值 | 说明 |
---|---|---|
eureka.client.availabilityZones | 告知client有哪些region及zone | |
eureka.client.region | us-east-1 | 指定该实例所在的region |
eureka.client.registerWithEureka | true | 是否向eureka server注册 |
eureka.client.preferSameZoneEureka | true | 是否优先向同一zone的server注册或者拉取服务列表信息 |
eureka.client.filterOnlyUpInstances | true | 是否过滤出InstanceStatus为UP的服务实例 |
eureka.client.serviceUrl | eureka server 的地址 | |
eureka.client.eurekaServerReadTimeoutSeconds | 8s | 读超时时间 |
eureka.client.eurekaServerConnectTimeoutSeconds | 5s | 连接eureka server的超时时间 |
eureka.instance.metadataMap | eureka 实例的元数据信息 |
更多配置见于EurekaClientConfigBean
Eureka REST端点
appID: 应用名称
instanceID:应用实例的id
操作 | Http action |
---|---|
服务注册 | POST /eureka/apps/appID |
服务下线 | DELETE /eureka/apps/appID/instanceID |
发送心跳 | PUT /eureka/apps/appID/instanceID |
查询所有appId的实例 | GET /eureka/apps/appID |
查询 实例的信息 | GET /eureka/apps/appID/instanceID |
更多见https://github.com/Netflix/eureka/wiki/Eureka-REST-operations
调用endpoint一个例子:

Eureka 源码
Server端
启动过程
-
注解@EnableEurekaServer引入EurekaServerMarkerConfiguration,EurekaServerMarkerConfiguration中生成一个名为Maker的Bean,其作用是EurekaServerAutoConfiguration生效
-
EurekaServerAutoConfiguration中又引入了EurekaServerInitializerConfiguration配置,EurekaServerInitializerConfiguration中的start方法启动了eureka server
@Override public void start() { new Thread(new Runnable() { @Override public void run() { try { // TODO: is this class even needed now? eurekaServerBootstrap.contextInitialized( EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } } }).start(); }
-
eurekaServerBootstrap.contextInitialized中完成初始化eureka环境变量,eureka上下文
public void contextInitialized(ServletContext context) { try { initEurekaEnvironment(); initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }
-
initEurekaServerContext启动Eureka Server
protected void initEurekaServerContext() throws Exception { // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // Copy registry from neighboring eureka node int registryCount = this.registry.syncUp(); // 拉取服务信息 this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }
剔除服务
server端剔除服务实例是由一个位于AbstractInstanceRegistry中名为evictionTimer的Timer定时器来执行EvictionTask任务。
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false); // 注销服务实例
}
}
}
Client端
服务注册
-
从EnableDiscoveryClient注解开始
由注解@EnableDiscoveryClient引入AutoServiceRegistrationConfiguration,AutoServiceRegistrationConfiguration开启通过配置spring.cloud.service-registry.auto-registration.enabled,并创建Marker Bean实例
-
EurekaClientAutoConfiguration类创建EurekaAutoServiceRegistration Bean
@Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaAutoServiceRegistration eurekaAutoServiceRegistration( ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) { return new EurekaAutoServiceRegistration(context, registry, registration); }
EurekaAutoServiceRegistration是实现了SmartLifecycle接口,start方法完成初始化
@Override public void start() { // only set the port if the nonSecurePort or securePort is 0 and this.port != 0 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get() && this.registration.getNonSecurePort() > 0) { this.serviceRegistry.register(this.registration); // 实际上没有注册,只是get信息 this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig())); this.running.set(true); } }
-
CloudEurekaClient的初始化
@Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); }
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.endpointRandomizer = endpointRandomizer; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); // 准备访问eureka server的客户端 AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 获取已经注册的服务 fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { // 如果是否在初始始化时向Eureka Server注册 throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch initScheduledTasks(); // 初始化定时任务 try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
-
向eureka server注册是由instanceInfoReplicator的start方法,这时默认延迟40s注册
/** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
@Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
服务续约
心跳检测执行器heartbeatExecutor会定时执行线程HeartbeatThread,会执行renew向eureka server续租
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
服务下线
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
更新服务列表
默认每隔30s从服务端GET一次增量版本信息,然后和本地比较并合并,保证本地能获取到其他节点最新注册信息。
TimedSupervisorTask每隔30s执行CacheRefreshThread线程
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes);
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
Eureka 性能
https://www.toutiao.com/i6661231047201522183/
http://springcloud.cn/view/31
参考以上两个
从其测试的来看一个Eureka最多支持多少个eureka实例是由操作系统、CPU线程、内存,应用容器连接数等限制的。
问题
- 一个Eureka Server最支持多少个eureka client? 要看具体的环境操作系统、CPU线程、内存,应用容器连接数等限制的
- eureka client的zone与eureka server的zone不一样时,eureka client会向eureka server 注册吗?
- 。。。
Eureka 容器化及k8s部署
---
apiVersion: v1
kind: ConfigMap
metadata:
name: eureka-cm
namespace: hjmos
data:
# if you want to deploy n instances of eureka cluster,
# you should set eureka_service_address: http://eureka-0.eureka:8761/eureka,...,http://eureka-(n-1).eureka:8761/eureka
eureka_service_address: http://eureka-0.eureka:8761/eureka,http://eureka-1.eureka:8761/eureka
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: eureka
namespace: hjmos
spec:
serviceName: 'eureka'
replicas: 2
selector:
matchLabels:
app: eureka # 符合目标的Pod有此标签
template:
metadata:
labels:
app: eureka # Pod副本的标签
spec:
containers:
- name: service-registry
image: 10.38.2.12:1000/library/service-registry:1.0.0
ports:
- containerPort: 8761
args:
- --spring.profiles.active=dev
- --management.server.port=8081
- --management.endpoints.web.exposure.include=*
- --management.endpoint.health.show-details=always
- --eureka.instance.health-check-url-path=/actuator/health
env:
- name: EUREKA_SERVER_ADDRESS
valueFrom:
configMapKeyRef:
name: eureka-cm
key: eureka_service_address
eureka-cm 存放eureka server集群信息
---
apiVersion: v1
kind: Service
metadata:
name: eureka
namespace: hjmos
labels:
app: eureka
spec:
clusterIP: None
ports:
- port: 8761
name: eureka
selector:
app: eureka
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
name: service-registry-ingress
annotations:
kubernetes.io/ingress.class: nginx
nginx.ingress.kubernetes.io/rewrite-target: /$2
nginx.ingress.kubernetes.io/use-regex: "true"
nginx.ingress.kubernetes.io/ssl-redirect: "false"
spec:
rules:
- http:
paths:
- path: /service-registry(/|$)(.*)
backend:
serviceName: eureka
servicePort: 8761
---
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
name: eureka-ingress
annotations:
kubernetes.io/ingress.class: nginx
nginx.ingress.kubernetes.io/use-regex: "true"
nginx.ingress.kubernetes.io/ssl-redirect: "false"
spec:
rules:
- http:
paths:
- path: /eureka
backend:
serviceName: eureka
servicePort: 8761
外部访问http://ip:port/service-registry, 最好的形式是ingress使用host
Eureka Client有三种形式可以注册到eureka 集群
-
eureka client和eureka server如果部署在kubernetes的同一个namespace。此时可以使用名为eureka-cm的ConfigMap的eureka_service_address配置数据设置
-
eureka client和eureka server如果部署在同一个kubernetes集群中。此时可以使用headless service的域名地址,这里eureka默认部署在kubernetes的default的命名空间中
eureka: client: serviceUrl: defaultZone: http://eureka-0.eureka.default.svc.cluster.local:8761/eureka/,http://eureka-1.eureka.default.svc.cluster.local:8761/eureka/,http://eureka-2.eureka.default.svc.cluster.local:8761/eureka/
-
http://ip:port/service-registry