Nacos源码分析十七、服务端实例注册分析

x33g5p2x  于2021-12-20 转载在 其他  
字(25.0k)|赞(0)|评价(0)|浏览(407)

本篇讨论nacos服务端对于实例注册的处理流程。

先回想一下客户端如何发起注册请求的,代码在NacosNamingService的registerService方法:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

    if (instance.isEphemeral()) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());

        beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }

    serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

临时节点添加心跳保持,然后调用serverProxy.registerService注册服务:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
        namespaceId, serviceName, instance);

    final Map<String, String> params = new HashMap<String, String>(9);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));

    reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}

不往里跟了,url是/nacos/v1/ns/instance。下面我们把目光转向服务端。首先是接收的接口InstanceController类,对应的方法是:

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    
    final Instance instance = parseInstance(request);
    
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

首先根据请求封装一个Instance实例对象,然后调用serviceManager的registerInstance方法进行实例注册:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    // 创建服务
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());

    // 获取服务
    Service service = getService(namespaceId, serviceName);

    // 再取一次,如果是空,可能是没有心跳被移除了
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }

    // 添加实例
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

同一个服务可能有多个实例,所以这里只是提供了实例接口,先判断服务是否存在,如果不存在则创建一个新的,最后把实例添加进去。

createEmptyService 创建服务

跟一下代码到createServiceIfAbsent方法:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException {
    // 先获取服务
    Service service = getService(namespaceId, serviceName);
    // 如果服务不存在则创建一个
    if (service == null) {
        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        // new 一个
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        //计算校验和
        service.recalculateChecksum();
        //有集群要添加
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        //服务验证,服务和集群名验证
        service.validate();

        //服务初始化
        putServiceAndInit(service);
        if (!local) {
            //永久服务还要添加到一致性服务里
            addOrReplaceService(service);
        }
    }
}
getService

先看一下服务是否存在,如果不存在则创建一个新的。看一下getService方法:

public Service getService(String namespaceId, String serviceName) {
    if (serviceMap.get(namespaceId) == null) {
        return null;
    }
    // 对应命名空间下的服务名
    return chooseServiceMap(namespaceId).get(serviceName);
}
public Map<String, Service> chooseServiceMap(String namespaceId) {
	return serviceMap.get(namespaceId);
}

这个serviceMap的结构是Map<namespaceId,Map<serviceName,Service>>。

服务初始化putServiceAndInit方法的过程:

private void putServiceAndInit(Service service) throws NacosException {
    //添加到命名空间中
    putService(service);
    //心跳初始化
    service.init();
    //生成key放入一致性服务里,永久的和临时的
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

putService就是往serviceMap中添加新的服务对象。

init方法:
public void init() {
    // 创建一个5秒的周期任务, 检查心跳 延迟5秒,周期5秒
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}

启一个clientBeatCheckTask任务。 下面关于Cluster的暂时先不看。 这里暂时没有。

我们看一下这个任务的执行逻辑:

@Override
public void run() {
    try {
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }
        
        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }

        // 当前service的所有实例
        List<Instance> instances = service.allIPs(true);
        
        // first set health status of instances:
        for (Instance instance : instances) {
            // 超过健康检查时间
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                        instance.getIp(), instance.getPort(), instance.getClusterName(),
                                        service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                        instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }
        
        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }
        
        // then remove obsolete instances:
        for (Instance instance : instances) {
            
            if (instance.isMarked()) {
                continue;
            }

            // 超过删除检查时间
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                        JacksonUtils.toJson(instance));
                deleteIp(instance);
            }
        }
        
    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }
    
}

检查该服务下的所有实例,如果最后上报的心跳时间超过了InstanceHeartBeatTimeOut时间,则发布事件,这里发布了两个事件ServiceChangeEvent和InstanceHeartbeatTimeoutEvent事件。

然后再检查是否超过了删除时间,如果过了删除时间,则执行deleteIp删除:

private void deleteIp(Instance instance) {
    
    try {
        NamingProxy.Request request = NamingProxy.Request.newRequest();
        request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
                .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
                .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
        
        String url = "http://127.0.0.1:" + ApplicationUtils.getPort() + ApplicationUtils.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
        
        // delete instance asynchronously:
        HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() {
            @Override
            public Object onCompleted(Response response) throws Exception {
                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                    Loggers.SRV_LOG
                            .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
                                    instance.toJson(), response.getResponseBody(), response.getStatusCode());
                }
                return null;
            }
        });
        
    } catch (Exception e) {
        Loggers.SRV_LOG
                .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
    }
}

调的是本机自己的删除实例接口。

ServiceChangeEvent

回来看心跳超时的两个事件。首先是ServiceChangeEvent。处理的监听器是PushService,我们看一下它的实现:

@Override
public void onApplicationEvent(ServiceChangeEvent event) {
    Service service = event.getService();
    String serviceName = service.getName();
    String namespaceId = service.getNamespaceId();
    
    Future future = GlobalExecutor.scheduleUdpSender(() -> {
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            //获取所有需要推送的PushClient
            ConcurrentMap<String, PushClient> clients = clientMap
                    .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            if (MapUtils.isEmpty(clients)) {
                return;
            }
            
            Map<String, Object> cache = new HashMap<>(16);
            long lastRefTime = System.nanoTime();
            for (PushClient client : clients.values()) {
                //超时的不删除不处理
                if (client.zombie()) {
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    clients.remove(client.toString());
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    continue;
                }
                
                Receiver.AckEntry ackEntry;
                Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                byte[] compressData = null;
                Map<String, Object> data = null;
                //有压缩数据
                if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                    org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                    compressData = (byte[]) (pair.getValue0());
                    data = (Map<String, Object>) pair.getValue1();
                    
                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                }

                //准备UDP数据
                if (compressData != null) {
                    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                } else {
                    ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                    if (ackEntry != null) {
                        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                    }
                }
                
                Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                        client.getServiceName(), client.getAddrStr(), client.getAgent(),
                        (ackEntry == null ? null : ackEntry.key));

                //发送
                udpPush(ackEntry);
            }
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
            
        } finally {
            //发送完删除
            futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        }
        
    }, 1000, TimeUnit.MILLISECONDS);
    
    futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
    
}

可以看到就是发送UDP数据通知所有客户端。我们看一下udpPush:

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    if (ackEntry == null) {
        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
        return null;
    }

    //大于尝试的次数 记录日志,failedPush+1,移除待发送数据,确认ackMap移除对应的key
    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return ackEntry;
    }
    
    try {
        if (!ackMap.containsKey(ackEntry.key)) {
            totalPush++;
        }
        ackMap.put(ackEntry.key, ackEntry);
        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
        
        Loggers.PUSH.info("send udp packet: " + ackEntry.key);
        //发送UDP报文
        udpSocket.send(ackEntry.origin);
        
        ackEntry.increaseRetryTime();

        //10秒没应答就再尝试一次
        // Retransmitter里面重新调用udpPush
        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
        
        return ackEntry;
    } catch (Exception e) {
        Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                ackEntry.origin.getAddress().getHostAddress(), e);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        
        return null;
    }
}

注意UDP的不可靠性,这里会重新尝试。我们看一下Retransmitter:

public static class Retransmitter implements Runnable {
    
    Receiver.AckEntry ackEntry;
    
    public Retransmitter(Receiver.AckEntry ackEntry) {
        this.ackEntry = ackEntry;
    }
    
    @Override
    public void run() {
        //没接受到响应。收到响应后会从ackMap中移除
        if (ackMap.containsKey(ackEntry.key)) {
            Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);
            // 再次发送
            udpPush(ackEntry);
        }
    }
}

还是调用udpPush。

InstanceHeartbeatTimeoutEvent

这个事件没找到哪儿处理的,有兴趣的朋友可以找找看。

consistencyService的listen
consistencyService
        .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
        .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);

一致性服务监听两个key,一个是临时的,一个是永久的。 这里的key前缀是com.alibaba.nacos.naming.iplist.,如果是临时的,后面多个ephemeral.

然后是DelegateConsistencyServiceImpl代理一致性服务

@Override
public void listen(String key, RecordListener listener) throws NacosException {
    
    // this special key is listened by both:
    // 特殊前缀key两个都监听
    // com.alibaba.nacos.naming.domains.meta.
    if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
        persistentConsistencyService.listen(key, listener);
        ephemeralConsistencyService.listen(key, listener);
        return;
    }

    // 根据key设置不同的一致性监听
    mapConsistencyService(key).listen(key, listener);
}
 private ConsistencyService mapConsistencyService(String key) {
     // com.alibaba.nacos.naming.iplist.ephemeral.
     return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
 }

我们看一下临时的DistroConsistencyServiceImpl

@Override
public void listen(String key, RecordListener listener) throws NacosException {
    // ConcurrentLinkedQueue队列
    if (!listeners.containsKey(key)) {
        listeners.put(key, new ConcurrentLinkedQueue<>());
    }

    // 二次确认不重复添加
    if (listeners.get(key).contains(listener)) {
        return;
    }
    // 添加到队列中
    listeners.get(key).add(listener);
}

就是添加到监听队列里。我们看一下监听什么。首先看一下DistroConsistencyServiceImpl的初始化方法:

@PostConstruct
public void init() {
    GlobalExecutor.submitLoadDataTask(loadDataTask);
    GlobalExecutor.submitDistroNotifyTask(notifier);
}

添加了两个任务,loadDataTask是启动加载的,这个以后再说。现在看notifier

@Override
public void run() {
    Loggers.DISTRO.info("distro notifier started");

    // 无限循环
    for (; ; ) {
        try {
            // 消费队列
            Pair<String, ApplyAction> pair = tasks.take();
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

当有task添加进来后,这里从队列中取出来,然后处理:

private void handle(Pair<String, ApplyAction> pair) {
    try {
        String datumKey = pair.getValue0();
        ApplyAction action = pair.getValue1();

        // 消费后移除key
        services.remove(datumKey);
        
        int count = 0;

        // 没有监听直接退出
        if (!listeners.containsKey(datumKey)) {
            return;
        }

        // 遍历监听器,调动对应的监听方法
        for (RecordListener listener : listeners.get(datumKey)) {
            
            count++;
            
            try {
                if (action == ApplyAction.CHANGE) {
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }
                
                if (action == ApplyAction.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }
        
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                    .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

可以看到监听onChange和onDelete。也就是说每个服务service创建出来后会添加到监听器列表中,当出现修改和删除时处理对应的变更。

addOrReplaceService

当要添加的服务不是临时服务时,会调用这个方法:

public void addOrReplaceService(Service service) throws NacosException {
    consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);
}

然后是RaftConsistencyServiceImpl的put方法:

@Override
public void put(String key, Record value) throws NacosException {
    try {
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft put failed.", e);
        throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
                e);
    }
}

然后是signalPublish方法:

public void signalPublish(String key, Record value) throws Exception {

    //不是leader
    if (!isLeader()) {
        ObjectNode params = JacksonUtils.createEmptyJsonNode();
        params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);
        
        final RaftPeer leader = getLeader();

        //交给leader去做/v1/ns/raft/datum
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }
    
    try {
        // 是leader
        // 加锁
        OPERATE_LOCK.lock();
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }
        
        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));

        //发布数据改变通知  peers是所有节点集合. peers.local获取本机
        onPublish(datum, peers.local());
        
        final String content = json.toString();

        // 过半同步成功才会响应,也就是说put操作需要过半同步成功,强一致性 CP模型
        //只要过半的结点数
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        //遍历所有结点
        for (final String server : peers.allServersIncludeMyself()) {
            //自己算一次
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            // /v1/ns/raft/datum/commit
            final String url = buildUrl(server, API_ON_PUB);
            // 异步同步数据
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
                    new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT
                                        .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                                datum.key, server, response.getStatusCode());
                                return 1;
                            }
                            // 处理完成coundDown
                            latch.countDown();
                            return 0;
                        }
                        
                        @Override
                        public STATE onContentWriteCompleted() {
                            return STATE.CONTINUE;
                        }
                    });
            
        }

        //等待半数完成  还有个5秒超时时间
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            // only majority servers return success can we consider this update success
            Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
            throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
        }
        
        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        // 解锁
        OPERATE_LOCK.unlock();
    }
}

可以看到这里涉及到raft部分的了。如果当前节点不是leader,则转发给leader处理。

如果是leader,onPublish进行本地数据保存,发布change任务。 然后通知所有从节点,使用CountDownLatch计数,当有超过一半+1个响应了,则提交完成了。我们看一下CountDownLatch的个数怎么来的peers.majorityCount():

public int majorityCount() {
    return peers.size() / 2 + 1;
}

节点数/2+1,即半数+1个。

回来看一下onPublish方法:

public void onPublish(Datum datum, RaftPeer source) throws Exception {
    RaftPeer local = peers.local();
    if (datum.value == null) {
        Loggers.RAFT.warn("received empty datum");
        throw new IllegalStateException("received empty datum");
    }

    //不是leader不能干这个事
    if (!peers.isLeader(source.ip)) {
        Loggers.RAFT
                .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                        JacksonUtils.toJson(getLeader()));
        throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
    }

    //过时了
    if (source.term.get() < local.term.get()) {
        Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                JacksonUtils.toJson(local));
        throw new IllegalStateException(
                "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
    }

    //重置任期
    local.resetLeaderDue();
    
    // if data should be persisted, usually this is true:
    if (KeyBuilder.matchPersistentKey(datum.key)) {
        raftStore.write(datum);
    }

    //放入数据
    datums.put(datum.key, datum);
    
    if (isLeader()) {
        local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
    } else {
        if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
            //set leader term:
            getLeader().term.set(source.term.get());
            local.term.set(getLeader().term.get());
        } else {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        }
    }
    raftStore.updateTerm(local.term.get());

    //添加任务
    notifier.addTask(datum.key, ApplyAction.CHANGE);
    
    Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

raftStore做了本地文件存储。然后addTask添加change任务。

至此新增服务部分完成了,下面是往服务里添加实例。

addInstance

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {

    //获得服务实例key
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

    //再次获取服务
    Service service = getService(namespaceId, serviceName);

    // 加锁
    synchronized (service) {
        //添加并获取所有该服务的实例
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        
        Instances instances = new Instances();
        //设置到包装对象里
        instances.setInstanceList(instanceList);

        //放进一致性服务里。这里根据key来选择是临时性的还是永久性的
        consistencyService.put(key, instances);
    }
}

首先是添加新的实例

addIpAddresses
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}

updateIpAddresses方法:

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
        throws NacosException {

    //获取老的实例集合数据
    Datum datum = consistencyService
            .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

    //获取集群中所有相关的实例集合,临时的或者是永久的
    List<Instance> currentIPs = service.allIPs(ephemeral);
    //IP端口和实例的映射
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    //实例ID集合
    Set<String> currentInstanceIds = Sets.newHashSet();

    //放入对应的集合里
    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }

    //更新后的老的实例集合
    Map<String, Instance> instanceMap;
    if (datum != null) {
        //根据当前服务实例的健康标志和心跳时间,来更新老的实例集合数据
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    } else {
        //重新创建一个
        instanceMap = new HashMap<>(ips.length);
    }

    //遍历新的实例
    for (Instance instance : ips) {
        //不存在就创建服务实例集群
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            //初始化,开启集群心跳检查
            cluster.init();
            //添加服务实例集群
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                            instance.getClusterName(), instance.toJson());
        }

        //删除操作的话就删除老的实例集合的数据
        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        } else {
            //否则添加
            instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            instanceMap.put(instance.getDatumKey(), instance);
        }
        
    }
    
    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(
                "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                        .toJson(instanceMap.values()));
    }

    //返回总的实例集合
    return new ArrayList<>(instanceMap.values());
}

这是一个统一的更新操作,新旧比较后处理完返回新的数据

然后丢到一致性服务里

consistencyService.put

前面分析过RaftConsistencyServiceImpl的put了,这里我们看一下临时的DistroConsistencyServiceImpl的put方法:

@Override
public void put(String key, Record value) throws NacosException {
    onPut(key, value);
    // 临时一致性协议的同步数据。这里同步数据是异步任务执行的,也就是说先返回客户端put成功再同步,弱一致性。 AP模型
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), ApplyAction.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}

首先是onPut方法:

public void onPut(String key, Record value) {
    
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        //创建临时数据
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        //放进一个map里
        dataStore.put(key, datum);
    }

    //没有监听器就返回
    if (!listeners.containsKey(key)) {
        return;
    }

    //有监听立即通知服务有改变
    notifier.addTask(key, ApplyAction.CHANGE);
}

可以看到addTask一个change任务。即新实例添加变更事件。

下面是distroProtocol.sync:

public void sync(DistroKey distroKey, ApplyAction action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        // holder持有临时同步延迟执行器引擎,引擎中有NacosTaskProcessor,临时一致性情况下实际上持有的是DistroDelayTaskProcessor,添加任务后最终由processor执行
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
        }
    }
}

这里是添加了distroDelayTask任务,通过DistroDelayTaskProcessor处理:

@Override
public boolean process(AbstractDelayTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    // 发起临时一致性同步任务
    if (ApplyAction.CHANGE.equals(distroDelayTask.getAction())) {
        DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
        distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask);
        return true;
    }
    return false;
}

syncChangeTask是异步线程了,我们看一下他的run方法:

@Override
public void run() {
    Loggers.DISTRO.info("[DISTRO-START] {}", toString());
    try {
        String type = getDistroKey().getResourceType();
        DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
        distroData.setType(ApplyAction.CHANGE);
        // syncData执行数据同步,交由 NamingProxy.syncData执行 /nacos/v1/ns/distro/datum
        boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
        // 同步失败重试,就是重新提交任务
        if (!result) {
            handleFailedTask();
        }
        Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
    } catch (Exception e) {
        Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
        handleFailedTask();
    }
}

发送rest请求,失败重试。

总结一下

到这里nacos服务端处理服务注册基本分析完了。简单总结一下:

  1. 首先看情况是否创建一个新的Service。

  2. 初始化时创建数据初始化加载任务和心跳检测任务

  3. 创建Service后需要添加监听,用于监听数据的修改和删除操作。

  4. 新服务Service 创建完成后需要把新的实例添加进去

  5. 新老实例比较一下进行更新

  6. 触发一致性服务的put方法,会先进行本地通知,然后进行集群数据同步。

  7. 过程中我们分析了两种集群同步数据的方式

  8. AP模型,弱一致性。通过创建DistroSyncChangeTask任务进行通知。主线程不等待集群通知结果先返回给用户。

  9. CP模型,强一致性。通过RaftCore的signalPublish方法进行数据同步,要求半数以上有响应才会成功。

相关文章