Nacos指南-服务发现:注册实例(二)

x33g5p2x  于2021-12-20 转载在 其他  
字(6.2k)|赞(0)|评价(0)|浏览(371)

接上回注册实例末尾我们提出的问题发现Nacos的服务端对于注册的服务与实例是有一个持续的健康性与存活性的检查机制,今天我们就来一探究竟,看看Nacos是如何实现的。

服务自动清理

ServiceManager

当配置文件中开启自动清理时,

nacos.naming.empty-service.auto-clean=true
/** * 服务管理中心的初始化方法 * Init service maneger. */
    @PostConstruct
    public void init() {
        //服务状态上报 延迟60s, 执行频率5s;
        GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
		//服务实例健康检查处理器
        GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
		//nacos.naming.empty-service.auto-clean 配置是否开启自动清理的参数
        if (emptyServiceAutoClean) {

            Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms",
                    cleanEmptyServiceDelay, cleanEmptyServicePeriod);

            // 延迟60s, 执行频率20s;
            // 该任务不建议频繁执行,以避免服务缓存被删除然后又因为心跳机制重复创建
            GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay,cleanEmptyServicePeriod);
        }

        try {
            Loggers.SRV_LOG.info("listen for service meta change");
            //一致性服务监听服务meta信息变更
            consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
        } catch (NacosException e) {
            Loggers.SRV_LOG.error("listen for service meta change failed!");
        }
    }
    /** * Register an instance to a service in AP mode. * 使用AP模式(舍弃强一致性而保证系统的分区容错性和可用性的场景)注册实例 * <p>This method creates service or cluster silently if they don't exist. * * @param namespaceId id of namespace * @param serviceName service name * @param instance instance to register * @throws Exception any error occurred in the process */
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //先判断服务是否存在,如果不存在则创建一个新的
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
		//根据命名空间及服务名获取服务
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
		//把当前实例添加进服务列表
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
    
    public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
        createServiceIfAbsent(namespaceId, serviceName, local, null);
    }  
    /** * Create service if not exist. * 获取服务,如果不存在则创建一个新的服务 * @param namespaceId namespace * @param serviceName service name * @param local whether create service by local * @param cluster cluster * @throws NacosException nacos exception */
    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        //获取服务
        Service service = getService(namespaceId, serviceName);
        //服务不存在则创建
        if (service == null) {
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            //创建一个服务
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            //服务校验(服务名称正则:"[0-9a-zA-Z@\\.:_-]+",集群名称正则:"[0-9a-zA-Z-]+")
            service.validate();
			//初始化服务
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

EmptyServiceAutoClean

private class EmptyServiceAutoClean implements Runnable {

        @Override
        public void run() {

            // 开启并行处理的临界值
            int parallelSize = 100;

            serviceMap.forEach((namespace, stringServiceMap) -> {
                Stream<Map.Entry<String, Service>> stream = null;
                // 若当前服务集合数量大于100,开启并行流处理
                if (stringServiceMap.size() > parallelSize) {
                    stream = stringServiceMap.entrySet().parallelStream();
                } else {
                    stream = stringServiceMap.entrySet().stream();
                }
                stream.filter(entry -> {
                    final String serviceName = entry.getKey();
                    return distroMapper.responsible(serviceName);
                }).forEach(entry -> stringServiceMap.computeIfPresent(entry.getKey(), (serviceName, service) -> {
                    if (service.isEmpty()) {
                        // 避免暴力删除当服务的回收标志次数大于回收阈值(默认为3次),执行删除服务
                        if (service.getFinalizeCount() > maxFinalizeCount) {
                            Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", namespace,
                                    serviceName);
                            try {
                                easyRemoveService(namespace, serviceName);
                            } catch (Exception e) {
                                Loggers.SRV_LOG.error("namespace : {}, [{}] services are automatically clean has "
                                        + "error : {}", namespace, serviceName, e);
                            }
                        }
						// 未达到阈值,回收标志+1
                        service.setFinalizeCount(service.getFinalizeCount() + 1);

                        Loggers.SRV_LOG
                                .debug("namespace : {}, [{}] The number of times the current service experiences "
                                                + "an empty instance is : {}", namespace, serviceName,
                                        service.getFinalizeCount());
                    } else {
                        service.setFinalizeCount(0);
                    }
                    return service;
                }));
            });
        }
    }

Service

/** * Init service. * 初始化服务 */
    public void init() {
        //心跳检查任务
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        //集群检查
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

ClientBeatCheckTask

public void run() {
        try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }
            
            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }
            
            List<Instance> instances = service.allIPs(true);
            
            // first set health status of instances:
            for (Instance instance : instances) {
                //当前时间与末次心跳时间间隔大于阈值(默认15s)则实例状态设置为"不健康"状态
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            //发布服务变更事件
                            getPushService().serviceChanged(service);
                            //发布心跳超时事件
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }
            
            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }
            
            // then remove obsolete instances:
            for (Instance instance : instances) {
                
                if (instance.isMarked()) {
                    continue;
                }
                //若当前时间与末次心跳时间间隔大于阈值(默认30s)则将实例节点从服务列表中删除
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    //删除实例节点 
                    deleteIp(instance);
                }
            }
            
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
        
    }

总结

通过本次源码分析,再次结合本人上篇绘制的流程图,相信大部分同学已经十分清楚Nacos服务端对于服务与实例的健康检查的实现了。

再次附上流程图

上一篇:Nacos注册实例
下一篇:Nacos注销实例

相关文章