Nacos源码分析十一、客户端服务注册

x33g5p2x  于2021-12-20 转载在 其他  
字(8.9k)|赞(0)|评价(0)|浏览(438)

前面部分主要是分析了nacos作为配置中心时,客户端的相关逻辑,主要包括:

  1. configService的初始化,对数据的监听
  2. 结合spring-cloud实现动态配置更新。

关于nacos服务端的存储结构后面再统一分析。

本篇分析nacos作为注册中心时客户端如何进行服务注册。我们以dubbo+nacos为例。首先添加依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-dubbo</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

dubbo的注册中心配置:

dubbo:
  registry:
    address: nacos://127.0.0.1:8848

定义一个service服务:

@Service
public class HelloServiceImpl implements HelloService {
    @Override
    public void sayHello() {
        System.out.println("hello spring cloud alibaba with dubbo and nacos.");
    }
}

注意这里的@Service是dubbo的。同时需要dubbo配置中定义包扫描路径。

服务启动后查看nacos服务端的服务列表:

测试代码搞完了,我们开始分析流程

获取Registry注册中心实例

我们知道dubbo在服务导出时初始化注册中心客户端并发起服务注册请求。我们看一下RegistryProtocol的register方法:

public void register(URL registryUrl, URL registeredProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registeredProviderUrl);

    ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
    model.addStatedUrl(new ProviderModel.RegisterStatedURL(
            registeredProviderUrl,
            registryUrl,
            true
    ));
}

由于我们定义的注册中心是nacos的,这里得到的registryUrl是nacos协议开头,即nacos://127.0.0.1:8848/xxxxx 这种格式。registryFactory是一个自适应扩展,根据协议类型nacos会选择NacosRegistryFactory,先看父类AbstractRegistryFactory的getRegistry方法:

@Override
public Registry getRegistry(URL url) {
    if (destroyed.get()) {
        LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
        return DEFAULT_NOP_REGISTRY;
    }

    url = URLBuilder.from(url)
            .setPath(RegistryService.class.getName())
            .addParameter(INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(EXPORT_KEY, REFER_KEY)
            .build();
    String key = createRegistryCacheKey(url);
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //create registry by spi/ioc
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}

主要是缓存,如果缓存取不到调用createRegistry模板方法,我们看一下NacosRegistryFactory的实现:

@Override
protected Registry createRegistry(URL url) {
    return new NacosRegistry(url, createNamingService(url));
}

就是初始化一个NacosRegistry对象,这里要注意一下createNamingService方法:

public static NamingService createNamingService(URL connectionURL) {
    Properties nacosProperties = buildNacosProperties(connectionURL);
    NamingService namingService;
    try {
        namingService = NacosFactory.createNamingService(nacosProperties);
    } catch (NacosException e) {
        if (logger.isErrorEnabled()) {
            logger.error(e.getErrMsg(), e);
        }
        throw new IllegalStateException(e);
    }
    return namingService;
}

nacos作为注册中心客户端的接口就是namingService,即名字服务,相关操作都由该接口定义,我们看一下创建方法:

public static NamingService createNamingService(Properties properties) throws NacosException {
    return NamingFactory.createNamingService(properties);
}

由NamingFactory工厂创建:

public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}

可以看到,和ConfigService类似,直接指定加载类名,然后通过反射创建。(具体初始化内容后面再分析)

由此我们得到了nacosRegistry,并且里面注入了一个namingService。

注册流程

下面是registry调用register方法进行服务注册:

@Override
public void register(URL url) {
    if (!acceptable(url)) {
        logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
        return;
    }
    super.register(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        addFailedRegistered(url);
    }
}

模板方法doRegister进行真正的注册操作,我们看NacosRegistry的实现:

@Override
public void doRegister(URL url) {
    final String serviceName = getServiceName(url);
    final Instance instance = createInstance(url);
    execute(namingService -> namingService.registerInstance(serviceName, instance));
}

获得服务名称,构造Instance对象,调用namingService.registerInstance方法进行注册

@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
    registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

    if (instance.isEphemeral()) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());

        beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }

    serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

ephemeral默认是true,临时节点,即临时节点添加心跳检测。

然后是serverProxy.registerService进行服务注册

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
        namespaceId, serviceName, instance);

    final Map<String, String> params = new HashMap<String, String>(9);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));

    reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}

开始准备http接口调用了。url是:/nacos//v1/ns/instance

public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {

    params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

    if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
        throw new NacosException(NacosException.INVALID_PARAM, "no server available");
    }

    NacosException exception = new NacosException();

    if (servers != null && !servers.isEmpty()) {

        Random random = new Random(System.currentTimeMillis());
        int index = random.nextInt(servers.size());

        for (int i = 0; i < servers.size(); i++) {
            String server = servers.get(index);
            try {
                return callServer(api, params, body, server, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", server, e);
                }
            }
            index = (index + 1) % servers.size();
        }
    }

    if (StringUtils.isNotBlank(nacosDomain)) {
        for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
            try {
                return callServer(api, params, body, nacosDomain, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                }
            }
        }
    }

    NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}",
        api, servers, exception.getErrCode(), exception.getErrMsg());

    throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
        + exception.getMessage());

}

http调用的时候有个失败重试机制。考虑到一般nacos服务端都是集群部署,这里是有失败重试策略的。

有两种失败重试的情况:

  1. 如果配置的是服务列表。这是一个简单的轮询,先随机一个index,然后每次失败后+1并和总数取余再作为index,总此时不超过服务的个数。一个简单的闭环。
  2. 如果有服务的域名配置,则直接网域名配置发送,重试3次。 – 这种情况等于是nacos服务集群对外一个服务地址,负载策略由部署方案决定(比如加个nginx负载)。

需要注意的是这个方法定义在NamingProxy中,也就是说通过该接口进行名字服务管理,都是用失败重试机制的。

总结

服务注册这部分基本就是这样了。简单来说就是通过NamingService实例来进行服务注册。需要准备服务名称和服务实例对象。另外这个地方和zk也蛮像的,有临时节点的概念,需要添加心跳维持。 最后要说明的是NamingService具有失败重发机制。
分析的过程中有部分是结合dubbo的,关于dubbo有兴趣的朋友可以点击这里了解更多。

相关文章