文章24 | 阅读 10069 | 点赞0
前面部分主要是分析了nacos作为配置中心时,客户端的相关逻辑,主要包括:
关于nacos服务端的存储结构后面再统一分析。
本篇分析nacos作为注册中心时客户端如何进行服务注册。我们以dubbo+nacos为例。首先添加依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-dubbo</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
dubbo的注册中心配置:
dubbo:
registry:
address: nacos://127.0.0.1:8848
定义一个service服务:
@Service
public class HelloServiceImpl implements HelloService {
@Override
public void sayHello() {
System.out.println("hello spring cloud alibaba with dubbo and nacos.");
}
}
注意这里的@Service是dubbo的。同时需要dubbo配置中定义包扫描路径。
服务启动后查看nacos服务端的服务列表:
测试代码搞完了,我们开始分析流程
我们知道dubbo在服务导出时初始化注册中心客户端并发起服务注册请求。我们看一下RegistryProtocol的register方法:
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
model.addStatedUrl(new ProviderModel.RegisterStatedURL(
registeredProviderUrl,
registryUrl,
true
));
}
由于我们定义的注册中心是nacos的,这里得到的registryUrl是nacos协议开头,即nacos://127.0.0.1:8848/xxxxx 这种格式。registryFactory是一个自适应扩展,根据协议类型nacos会选择NacosRegistryFactory,先看父类AbstractRegistryFactory的getRegistry方法:
@Override
public Registry getRegistry(URL url) {
if (destroyed.get()) {
LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
"Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
return DEFAULT_NOP_REGISTRY;
}
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
主要是缓存,如果缓存取不到调用createRegistry模板方法,我们看一下NacosRegistryFactory的实现:
@Override
protected Registry createRegistry(URL url) {
return new NacosRegistry(url, createNamingService(url));
}
就是初始化一个NacosRegistry对象,这里要注意一下createNamingService方法:
public static NamingService createNamingService(URL connectionURL) {
Properties nacosProperties = buildNacosProperties(connectionURL);
NamingService namingService;
try {
namingService = NacosFactory.createNamingService(nacosProperties);
} catch (NacosException e) {
if (logger.isErrorEnabled()) {
logger.error(e.getErrMsg(), e);
}
throw new IllegalStateException(e);
}
return namingService;
}
nacos作为注册中心客户端的接口就是namingService,即名字服务,相关操作都由该接口定义,我们看一下创建方法:
public static NamingService createNamingService(Properties properties) throws NacosException {
return NamingFactory.createNamingService(properties);
}
由NamingFactory工厂创建:
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
可以看到,和ConfigService类似,直接指定加载类名,然后通过反射创建。(具体初始化内容后面再分析)
由此我们得到了nacosRegistry,并且里面注入了一个namingService。
下面是registry调用register方法进行服务注册:
@Override
public void register(URL url) {
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
模板方法doRegister进行真正的注册操作,我们看NacosRegistry的实现:
@Override
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
execute(namingService -> namingService.registerInstance(serviceName, instance));
}
获得服务名称,构造Instance对象,调用namingService.registerInstance方法进行注册
@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
ephemeral默认是true,临时节点,即临时节点添加心跳检测。
然后是serverProxy.registerService进行服务注册
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
开始准备http接口调用了。url是:/nacos//v1/ns/instance
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}",
api, servers, exception.getErrCode(), exception.getErrMsg());
throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}
http调用的时候有个失败重试机制。考虑到一般nacos服务端都是集群部署,这里是有失败重试策略的。
有两种失败重试的情况:
需要注意的是这个方法定义在NamingProxy中,也就是说通过该接口进行名字服务管理,都是用失败重试机制的。
服务注册这部分基本就是这样了。简单来说就是通过NamingService实例来进行服务注册。需要准备服务名称和服务实例对象。另外这个地方和zk也蛮像的,有临时节点的概念,需要添加心跳维持。 最后要说明的是NamingService具有失败重发机制。
分析的过程中有部分是结合dubbo的,关于dubbo有兴趣的朋友可以点击这里了解更多。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_19414183/article/details/112307969
内容来源于网络,如有侵权,请联系作者删除!