搬砖小弟
Spring Cloud - Eureka
2019-11-04 / 26 min read

是由Netflix开源的基于REST的服务治理组件,包含了eureka server 和eureka client。从2012年9月在GitHub上发布1.1.2版本以来,至今已经发布了231次,最新版本为2018年8月份发布的1.9.4版本。期间有进行2.x版本的开发,不过由于各种原因内部已经冻结开发,目前还是以1.x版本为主。Spring Cloud Netflix Eureka是Pivotal公司为了将Netflix Eureka整合于Spring Cloud生态系统提供的版本。

Eureka快速入门

Eureka Server

  1. 引入依赖

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
            </dependency>
    
  2. 主应用类上添加**@EnableEurekaServer**

  3. 配置eureka

    eureka:
      client:
        register-with-eureka: true
        fetch-registry: true
        service-url:
          defaultZone: ${EUREKA_SERVER_ADDRESS:http://localhost:8761/eureka/}
      server:
        peer-node-read-timeout-ms: 5000
    
  4. 启动

    ![image-20191024165113212](Spring Cloud - 服务注册中心eureka/image-20191024165113212.png)

Eureka Client

  1. 引入客户端依赖

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
    
  2. 主应用类添加**@EnableDiscoveryClient**

  3. 配置

    eureka:
      instance:
        prefer-ip-address: true
      client:
        service-url:
          defaultZone: ${EUREKA_SERVER_ADDRESS:http://localhost:8761/eureka/}
    
  4. 启动客户端

Eureka 高可用

application-peer1.yml

---
server:
  port: 8761

eureka:
  client:
    register-with-eureka: true # 是否向 eureka server 注册
    fetch-registry: true # 是否向 eureka server 获取其他客户端注册信息
    service-url:
      defaultZone: ${EUREKA_SERVER_ADDRESSES:http://localhost:8762/eureka/}
  server:
    registry-sync-retry-wait-ms: 5000

application-pee2.yml

---
server:
  port: 8762

eureka:
  client:
    register-with-eureka: true # 是否向 eureka server 注册
    fetch-registry: true # 是否向 eureka server 获取其他客户端注册信息
    service-url:
      defaultZone: ${EUREKA_SERVER_ADDRESSES:http://localhost:8761/eureka/}
  server:
    registry-sync-retry-wait-ms: 5000

客户端配置

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/
    register-with-eureka: true
    fetch-registry: true

启动两个server组成集群,客户配置seviceUrl为集群地址

  1. 在两个server都没有启动的情况下,启动client会报错,但还是能启动起来的
  2. 部分server启动的情况下,client是可以注册到集群的并拉取到服务列表

server采用点对点复制方式,高可用主要是在客户端实现的

  1. 在Client启动之前,如果没有Eureka Server,则可以通过配置eureka.client.backup-registry-impl从备份registry读取关键服务的信息。(一般不会设置,应该是搭建eureka server集群)
  2. 在client启动之后,若运行时出现Eureka Server全部挂掉的情况:本地内存有localRegion之前获取的数据,在Eureka Server都不可用的情况下,从Server端定时拉取注册信息回来更新的线程CacheRefreshThread会执行失败,本地localRegion信息不会被更新。
  3. Client端维护了一个Eureka Server的不可用列表,一旦请求发生Connection error或者5xx的情况则会被列入该列表,当该列表的大小超过指定阈值则会重新清空。在重新清空的情况下,Client默认是采用RetryableEurekaHttpClient进行请求,numberOfRetries为3,因此也能够在一定程度保障Client的高可用。

Eureka 设计原理

作为服务注册中心,主要解决以下问题:

  1. 客户端怎么向服务注册中心注册?

    服务启动时,调用Eureka Server的REST API的register方法,去注册该应用实例的信息。

  2. 服务实例怎么从服务注册中心中剔除

    正常情况下,服务在关闭时通过钩子方法或其他生命周期回调方法去调用Eureka Server的REST API的de-register方法,来删除自身服务实例的信息。为了解决异常情况下,服务没有及时从服务中剔除,Eureka Server要求客户端每隔一段时间续约,发送心跳。如果租约超过一定时间没有进行续约操作,Eureka Server端会主动剔除服务。

  3. 服务实例数据的一致性问题

    服务注册中心微服务中重要的组件,在生产环境中必然是集群部署,两个服务注册中心上的服务列表数据怎么保持一致呢?

    下面从四个方面来解释:

    • AP优于CP

      对于分布式系统来说,一般网络条件相对不可控,出现网络分区是不可避免的,因此系统必须具备分区容忍性。在这个前提下分布式系统的设计则在AP及CP之间进行选择。Eureka是在部署在AWS的背景下设计的,其设计者认为,在云端,特别是在大规模部署的情况下,失败是不可避免的,可能因为Eureka自身部署失败,注册的服务不可用,或者由于网络分区导致服务不可用,因此不能回避这个问题。要拥抱这个问题,就需要Eureka在网络分区的时候,还能够正常提供服务注册及发现功能,因此Eureka选择满足Availability这个特性。在实际生产实践中,服务注册及发现中心保留可用及过期的数据总比丢失掉可用的数据好。

    • Peer to Peer

      Eureka Server本身依赖了Eureka Client,也就是每个Eureka Server是作为其他Eureka Server的Client。在单个Eureka Server启动的时候,会有一个syncUp的操作,通过Eureka Client请求其他Eureka Server节点中的一个节点获取注册的应用实例信息,然后复制到其他peer节点。

      Eureka Server在执行复制操作的时候,使用HEADER_REPLICATION的http header来将这个请求操作与普通应用实例的正常请求操作区分开来。通过HEADER_REPLICATION来标识是复制请求,这样其他peer节点接收到请求的时候,就不会再对它的peer节点进行复制操作,从而避免死循环。

    • Region及Zone设计

      由于Netflix的服务大部分是部署在amazon上,因此Eureka的设计一部分也是基于zmazon的region及zone的基础设施上。

      Region可以理解为地理上的分区,如亚洲地区、华北地区;Zone可以理解为region内的具体机房,比如说 region 划分为华北地区,然后华北地区有两个机房,就可以在此 region 之下划分出 zone1、zone2 两个 zone。服务最好是注册到同一zone的注册中心,因为如果不是同一zone,可能心跳检测有会问题。服务调用是优先调用同一zone的服务的。

      Eureka Server原生支持了Region及AvailabilityZone,由于资源在Region之间默认是不会复制的,因此Eureka Server的高可用主要就在于Region下面的AvailabilityZone。

      Eureka Client支持preferSameZone,也就是获取Eureka Server的serviceUrl优先拉取跟应用实例同处于一个AvailabilityZone的Eureka Server地址列表。一个AvailabilityZone可以设置多个Eureka Server实例,它们之间构成peer节点,然后采用Peer to Peer的复制模式。

    • 自我保护模式

      在分布式系统中通常是要对服务实例进行存活检测的,在发生网络抖动或暂时不可用时可能会误判, 极端情况可能造成注册中心清空服务列表。因此Eureka使用保护模式。

      Eureka Client端与Server端之间有个租约,Client要定时发送心跳来维持这个租约,表示自己还存活着。Eureka通过当前注册的实例数,去计算每分钟应该从应用实例接收到的心跳数,如果最近一分钟接收到的续约的次数小于等于指定阈值的话,则关闭租约失效剔除,禁止定时任务剔除失效的实例,从而保护注册信息。

      关闭自我保护模样:eureka.server.enable-self-preservation=true

Eureka 核心类

InstanceInfo

Eureka使用InstanceInfo类代表注册的服务实例。其主要字段有:

字段 说明
instanceId 实例id
appName 应用名
ipAddr 服务的ip地址
port 端口
datacenterInfo DataCenterInfo(Netflix, Amazon, MyOwn)
leaseInfo LeaseInfo租约信息
Status InstanceStatus服务实例信息
lastUpdatedTimestamp 上一次更新时间
lastDirtyTimestamp 上一次失效时间
metadata 元数据信息

LeaseInfo

Eureka使用LeaseInfo表示应用服务实例的租约信息,其主要字段有:

字段 说明
serviceUpTimestamp 服务启动的时间戳
registrationTimestamp 服务注册的时间戳
lastRenewalTimestamp 上一次续租时间戳
evictionTimestamp 被剔除的时间戳
renewalIntervalInSecs 客户端续租的间隔周期
durationInSecs 客户端设置的租约的有效时长

InstanceStatus

Eureka使用InstanceStatus表示服务实例的状态。InstanceStatus是枚举类,枚举值有:

UP 服务上线
DOWN 服务下线(心跳检测失败)
STARTING 服务启动中
OUT_OF_SERVICE 服务关闭(服务自己下线)
UNKNOWN 未知

ServiceInstance

ServiceInstance是Spring Cloud是service discovery的实例的抽象接口,约定的服务的实例应该有哪些通用信息。由于Spring Cloud Discovery适配了Zookeeper、Consul、Netflix Eureka等注册中心,因此其ServiceInstance定义更为抽象和通用,而且采取的是定义方法的方式。EurekaRegistration实现了ServiceInstance接口。

LeaseManager

方法 作用
void register(T r, int leaseDuration, boolean isReplication) 注册新服务
boolean cancel(String appName, String id, boolean isReplication) 删除服务
boolean renew(String appName, String id, boolean isReplication) 服务续约
void evict() 剔除过期的服务,server使用

LookupService

方法 作用
Application getApplication(String appName); 根据appName查找应用
Applications getApplications(); 查询所有应用
List getInstancesById(String id); 根据id查找服务实例
InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);

Eureka 核心参数

Server与Client共用参数

参数 默认值 说明
eureka.instance.instanceId eureka实例id
eureka.instance.preferIpAddress flase 是否优先使用ip来代替hostname作为实例的hostName字段的值
eureka.instance.metadataMap eureka实例的元数据
eureka.instance.leaseRenewalIntervalInSeconds 30 eureka client发送心跳的周期
eureka.instance.leaseExpirationDurationInSeconds 90 eureka server等待client续约的时间

更多参数见EurekaInstanceConfigBean

Server端(见EurekaServerConfigBean)

参数 默认值 说明
eureka.server.enableSelfPreservation True 是否启用保护模式
eureka.server.renewalPercentThreshold 0.85 指定需要收到续租的阀值
eureka.server.renewalThresholdUpdateIntervalMs 15分钟 指定了renewalThresholdUpdate定时任务的调度频率
eureka.server.evictionIntervalTimerInMs 60 * 1000 指定了EvictionTask的调度频率,用于剔除过期的服务
eureka.server.useReadOnlyResponseCache True 是否使用只读的response cache
eureka.server.responseCacheUpdateIntervalMs 30s 设置CacheUpdateTask的调度时间间隔
eureka.server.responseCacheAutoExpirationInSeconds readWriteCacheMapr的expireAfterWrite,指定写入多常时间过期
eureka.server.peerEurekaNodesUpdateIntervalMs 10分钟 指定PeerUpdateTask的时间间隔
eureka.server.peerEurekaStatusRefreshTimeIntervalMs 30s 指定更新peer状态的时间隔
eureka.server.peerNodeConnectTimeoutMs 200ms peer节点连接超时,单位ms
eureka.server.peerNodeReadTimeoutMs 200ms peer节点读超时
eureka.server.peerNodeTotalConnections 1000 连接池最大连接数

更多配置见于EurekaServerConfigBean

Client端

参数 默认值 说明
eureka.client.availabilityZones 告知client有哪些region及zone
eureka.client.region us-east-1 指定该实例所在的region
eureka.client.registerWithEureka true 是否向eureka server注册
eureka.client.preferSameZoneEureka true 是否优先向同一zone的server注册或者拉取服务列表信息
eureka.client.filterOnlyUpInstances true 是否过滤出InstanceStatus为UP的服务实例
eureka.client.serviceUrl eureka server 的地址
eureka.client.eurekaServerReadTimeoutSeconds 8s 读超时时间
eureka.client.eurekaServerConnectTimeoutSeconds 5s 连接eureka server的超时时间
eureka.instance.metadataMap eureka 实例的元数据信息

更多配置见于EurekaClientConfigBean

Eureka REST端点

appID: 应用名称

instanceID:应用实例的id

操作 Http action
服务注册 POST /eureka/apps/appID
服务下线 DELETE /eureka/apps/appID/instanceID
发送心跳 PUT /eureka/apps/appID/instanceID
查询所有appId的实例 GET /eureka/apps/appID
查询 实例的信息 GET /eureka/apps/appID/instanceID

更多见https://github.com/Netflix/eureka/wiki/Eureka-REST-operations

调用endpoint一个例子:

Eureka 源码

Server端

启动过程

  1. 注解@EnableEurekaServer引入EurekaServerMarkerConfiguration,EurekaServerMarkerConfiguration中生成一个名为Maker的Bean,其作用是EurekaServerAutoConfiguration生效

  2. EurekaServerAutoConfiguration中又引入了EurekaServerInitializerConfiguration配置,EurekaServerInitializerConfiguration中的start方法启动了eureka server

    @Override
    public void start() {
       new Thread(new Runnable() {
          @Override
          public void run() {
             try {
                // TODO: is this class even needed now?
                eurekaServerBootstrap.contextInitialized(
                      EurekaServerInitializerConfiguration.this.servletContext);
                log.info("Started Eureka Server");
    
                publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                EurekaServerInitializerConfiguration.this.running = true;
                publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
             }
             catch (Exception ex) {
                // Help!
                log.error("Could not initialize Eureka servlet context", ex);
             }
          }
       }).start();
    }
    
  3. eurekaServerBootstrap.contextInitialized中完成初始化eureka环境变量,eureka上下文

    public void contextInitialized(ServletContext context) {
       try {
          initEurekaEnvironment();
          initEurekaServerContext();
    
          context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
       }
       catch (Throwable e) {
          log.error("Cannot bootstrap eureka server :", e);
          throw new RuntimeException("Cannot bootstrap eureka server :", e);
       }
    }
    
    
  4. initEurekaServerContext启动Eureka Server

    protected void initEurekaServerContext() throws Exception {
       // For backward compatibility
       JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
             XStream.PRIORITY_VERY_HIGH);
       XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
             XStream.PRIORITY_VERY_HIGH);
    
       if (isAws(this.applicationInfoManager.getInfo())) {
          this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                this.eurekaClientConfig, this.registry, this.applicationInfoManager);
          this.awsBinder.start();
       }
    
       EurekaServerContextHolder.initialize(this.serverContext);
    
       log.info("Initialized server context");
    
       // Copy registry from neighboring eureka node
       int registryCount = this.registry.syncUp();  // 拉取服务信息
       this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
       // Register all monitoring statistics.
       EurekaMonitors.registerAllStats();
    }
    

剔除服务

server端剔除服务实例是由一个位于AbstractInstanceRegistry中名为evictionTimer的Timer定时器来执行EvictionTask任务。

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");

    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            internalCancel(appName, id, false); // 注销服务实例
        }
    }
}

Client端

服务注册

  1. 从EnableDiscoveryClient注解开始

    由注解@EnableDiscoveryClient引入AutoServiceRegistrationConfiguration,AutoServiceRegistrationConfiguration开启通过配置spring.cloud.service-registry.auto-registration.enabled,并创建Marker Bean实例

  2. EurekaClientAutoConfiguration类创建EurekaAutoServiceRegistration Bean

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
          ApplicationContext context, EurekaServiceRegistry registry,
          EurekaRegistration registration) {
       return new EurekaAutoServiceRegistration(context, registry, registration);
    }
    
    

    EurekaAutoServiceRegistration是实现了SmartLifecycle接口,start方法完成初始化

    @Override
    public void start() {
       // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
       if (this.port.get() != 0) {
          if (this.registration.getNonSecurePort() == 0) {
             this.registration.setNonSecurePort(this.port.get());
          }
    
          if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
             this.registration.setSecurePort(this.port.get());
          }
       }
    
       // only initialize if nonSecurePort is greater than 0 and it isn't already running
       // because of containerPortInitializer below
       if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
    
          this.serviceRegistry.register(this.registration); // 实际上没有注册,只是get信息
    
          this.context.publishEvent(new InstanceRegisteredEvent<>(this,
                this.registration.getInstanceConfig()));
          this.running.set(true);
       }
    }
    
    
  3. CloudEurekaClient的初始化

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
    public EurekaClient eurekaClient(ApplicationInfoManager manager,
          EurekaClientConfig config) {
       return new CloudEurekaClient(manager, config, this.optionalArgs,
             this.context);
    }
    
    
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();
    
        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }
    
        this.backupRegistryProvider = backupRegistryProvider;
        this.endpointRandomizer = endpointRandomizer;
        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());
    
        fetchRegistryGeneration = new AtomicLong(0);
    
        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
    
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());
    
            return;  // no need to setup up an network tasks and we are done
        }
    
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
    
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
    
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
    
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args); // 准备访问eureka server的客户端
    
            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }
    
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 获取已经注册的服务
            fetchRegistryFromBackup();
        }
    
        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }
    
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) { // 如果是否在初始始化时向Eureka Server注册
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }
    
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();  // 初始化定时任务
    
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }
    
        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }
    
  4. 向eureka server注册是由instanceInfoReplicator的start方法,这时默认延迟40s注册

    /**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }
    
    @Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    

服务续约

心跳检测执行器heartbeatExecutor会定时执行线程HeartbeatThread,会执行renew向eureka server续租

/**
 * Renew with the eureka service by making the appropriate REST call
 */
boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

服务下线

@PreDestroy
@Override
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");

        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }

        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null
                && clientConfig.shouldRegisterWithEureka()
                && clientConfig.shouldUnregisterOnShutdown()) {
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }

        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }

        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        logger.info("Completed shut down of DiscoveryClient");
    }
}

更新服务列表

默认每隔30s从服务端GET一次增量版本信息,然后和本地比较并合并,保证本地能获取到其他节点最新注册信息。

TimedSupervisorTask每隔30s执行CacheRefreshThread线程

class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}

@VisibleForTesting
void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

        boolean remoteRegionsModified = false;
        // This makes sure that a dynamic change to remote regions to fetch is honored.
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {
                        logger.info("Remote regions to fetch modified concurrently," +
                                " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                    }
                }
            } else {
                // Just refresh mapping to reflect any DNS/Property change
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }

        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }

        if (logger.isDebugEnabled()) {
            StringBuilder allAppsHashCodes = new StringBuilder();
            allAppsHashCodes.append("Local region apps hashcode: ");
            allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
            allAppsHashCodes.append(", is fetching remote regions? ");
            allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
            for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                allAppsHashCodes.append(", Remote region: ");
                allAppsHashCodes.append(entry.getKey());
                allAppsHashCodes.append(" , apps hashcode: ");
                allAppsHashCodes.append(entry.getValue().getAppsHashCode());
            }
            logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                    allAppsHashCodes);
        }
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }
}

Eureka 性能

https://www.toutiao.com/i6661231047201522183/

http://springcloud.cn/view/31

参考以上两个

从其测试的来看一个Eureka最多支持多少个eureka实例是由操作系统、CPU线程、内存,应用容器连接数等限制的。

问题

  1. 一个Eureka Server最支持多少个eureka client? 要看具体的环境操作系统、CPU线程、内存,应用容器连接数等限制的
  2. eureka client的zone与eureka server的zone不一样时,eureka client会向eureka server 注册吗?
  3. 。。。

Eureka 容器化及k8s部署

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: eureka-cm
  namespace: hjmos
data:
  # if you want to deploy n instances of eureka cluster,
  # you should set eureka_service_address: http://eureka-0.eureka:8761/eureka,...,http://eureka-(n-1).eureka:8761/eureka
  eureka_service_address: http://eureka-0.eureka:8761/eureka,http://eureka-1.eureka:8761/eureka
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: eureka
  namespace: hjmos
spec:
  serviceName: 'eureka'
  replicas: 2
  selector:
    matchLabels:
      app: eureka # 符合目标的Pod有此标签
  template:
    metadata:
      labels:
        app: eureka  # Pod副本的标签
    spec:
      containers:
        - name: service-registry
          image: 10.38.2.12:1000/library/service-registry:1.0.0
          ports:
            - containerPort: 8761
          args:
            - --spring.profiles.active=dev
            - --management.server.port=8081
            - --management.endpoints.web.exposure.include=*
            - --management.endpoint.health.show-details=always
            - --eureka.instance.health-check-url-path=/actuator/health
          env:
            - name: EUREKA_SERVER_ADDRESS
              valueFrom:
                configMapKeyRef:
                  name: eureka-cm
                  key: eureka_service_address

eureka-cm 存放eureka server集群信息

---
apiVersion: v1
kind: Service
metadata:
  name: eureka
  namespace: hjmos
  labels:
    app: eureka
spec:
  clusterIP: None
  ports:
    - port: 8761
      name: eureka
  selector:
    app: eureka

apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  name: service-registry-ingress
  annotations:
    kubernetes.io/ingress.class: nginx
    nginx.ingress.kubernetes.io/rewrite-target: /$2
    nginx.ingress.kubernetes.io/use-regex: "true"
    nginx.ingress.kubernetes.io/ssl-redirect: "false"
spec:
  rules:
    - http:
        paths:
          - path: /service-registry(/|$)(.*)
            backend:
              serviceName: eureka
              servicePort: 8761
---
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  name: eureka-ingress
  annotations:
    kubernetes.io/ingress.class: nginx
    nginx.ingress.kubernetes.io/use-regex: "true"
    nginx.ingress.kubernetes.io/ssl-redirect: "false"
spec:
  rules:
    - http:
        paths:
          - path: /eureka
            backend:
              serviceName: eureka
              servicePort: 8761

外部访问http://ip:port/service-registry, 最好的形式是ingress使用host

Eureka Client有三种形式可以注册到eureka 集群

  1. eureka client和eureka server如果部署在kubernetes的同一个namespace。此时可以使用名为eureka-cm的ConfigMap的eureka_service_address配置数据设置

  2. eureka client和eureka server如果部署在同一个kubernetes集群中。此时可以使用headless service的域名地址,这里eureka默认部署在kubernetes的default的命名空间中

    eureka:
      client:
        serviceUrl:
          defaultZone: http://eureka-0.eureka.default.svc.cluster.local:8761/eureka/,http://eureka-1.eureka.default.svc.cluster.local:8761/eureka/,http://eureka-2.eureka.default.svc.cluster.local:8761/eureka/
    
  3. http://ip:port/service-registry