Nacos源码分析二、配置中心(1)

x33g5p2x  于2021-12-20 转载在 其他  
字(12.1k)|赞(0)|评价(0)|浏览(387)

首先nacos服务端分为集群部署和单机部署两种模式,我们以单机部署为例。

具体部署方式参考官方文档。链接

这里默认启动本地单机模式服务端。

我们以com.alibaba.nacos.example.ConfigExample这个测试类进行分析。

public static void main(String[] args) throws NacosException, InterruptedException {
    String serverAddr = "localhost";
    String dataId = "test";
    String group = "DEFAULT_GROUP";
    Properties properties = new Properties();
    properties.put("serverAddr", serverAddr);
    ConfigService configService = NacosFactory.createConfigService(properties);
    String content = configService.getConfig(dataId, group, 5000);
    System.out.println(content);
    configService.addListener(dataId, group, new Listener() {
        @Override
        public void receiveConfigInfo(String configInfo) {
            System.out.println("receive:" + configInfo);
        }

        @Override
        public Executor getExecutor() {
            return null;
        }
    });

    boolean isPublishOk = configService.publishConfig(dataId, group, "content");
    System.out.println(isPublishOk);

    Thread.sleep(3000);
    content = configService.getConfig(dataId, group, 5000);
    System.out.println(content);

    boolean isRemoveOk = configService.removeConfig(dataId, group);
    System.out.println(isRemoveOk);
    Thread.sleep(3000);

    content = configService.getConfig(dataId, group, 5000);
    System.out.println(content);
    Thread.sleep(300000);

}
  1. 构建配置Properties对象,添加serverAddr
  2. 创建ConfigService实例,这个是nacos作为配置中心时的客户端接口。
  3. 根据dataId和group获取配置内容。 配置资源的资源坐标由dataId、group、namespace唯一确定。configService在初始化时给了默认的namespace。
  4. 添加监听器
  5. 发布、获取、删除配置等操作

看一下如何得到ConfigService实例, ConfigService configService = NacosFactory.createConfigService(properties); 这行代码基本上包含了所有客户端需要做的事情。

public static ConfigService createConfigService(Properties properties) throws NacosException {
    return ConfigFactory.createConfigService(properties);
}

里面调用ConfigFactory工厂类获得实例对象:

public static ConfigService createConfigService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}
  1. 通过Class.forName得到NacosConfigService的Class对象
  2. 拿到以Properties为参数的构造方法
  3. 通过反射创建实例对象并返回

下面看一下NacosConfigService这个的构造方法:

public NacosConfigService(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = Constants.ENCODE;
    } else {
        this.encode = encodeTmp.trim();
    }
    initNamespace(properties);

    // ServerHttpAgent http代理
    // MetricsHttpAgent 又代理了一次
    // 另外,屏蔽了集群逻辑。提供的方法只是统一的http调用。
    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    this.agent.start();
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
  1. 配置检查
  2. 设置编码格式
  3. 初始化namespace
  4. 构造http代理并start
  5. 创建worker

MetricsHttpAgent类实际上就是对ServerHttpAgent进行代理,包装了一层prometheus用于指标采集,以httpGet为例:

@Override
public HttpRestResult<String> httpGet(String path, Map<String, String> headers, Map<String, String> paramValues,
        String encode, long readTimeoutMs) throws Exception {
    Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("GET", path, "NA");
    HttpRestResult<String> result;
    try {
        result = httpAgent.httpGet(path, headers, paramValues, encode, readTimeoutMs);
    } catch (IOException e) {
        throw e;
    } finally {
        timer.observeDuration();
        timer.close();
    }
    
    return result;
}

主要是针对时间数据采集。

我们看ServerHttpAgent类:

public ServerHttpAgent(Properties properties) throws NacosException {
    // 集群管理类
    this.serverListMgr = new ServerListManager(properties);
    // 安全认证
    this.securityProxy = new SecurityProxy(properties, NACOS_RESTTEMPLATE);
    // 命名空间
    this.namespaceId = properties.getProperty(PropertyKeyConst.NAMESPACE);

    // 初始化配置 encoding、maxRetry、ak、sk
    // ak、sk在登录认证时未用到。发送get、post、delete请求时需要验证
    init(properties);
    // 登录认证
    this.securityProxy.login(this.serverListMgr.getServerUrls());
    
    // init executorService
    // daemon线程
    this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.config.security.updater");
            t.setDaemon(true);
            return t;
        }
    });

    // 5秒一次的登录认证(会校验是否在token窗口内,如果不在,则重新获取token),刷新token和token窗口
    this.executorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            securityProxy.login(serverListMgr.getServerUrls());
        }
    }, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
    
}
  1. 构造集群管理类ServerListManager
  2. 然后是安全代理类SecurityProxy
  3. namespace
  4. 初始化encoding编码、maxRetry最大重试次数、ak、sk等配置
  5. 通过securityProxy进行登录认证,然后启动个任务,每隔5秒重新调用login接口进行认证

我们看一下securityProxy.login:

public boolean login(List<String> servers) {
    
    try {
        // token刷新窗口范围内,返回true
        if ((System.currentTimeMillis() - lastRefreshTime) < TimeUnit.SECONDS
                .toMillis(tokenTtl - tokenRefreshWindow)) {
            return true;
        }

        // 集群每个server进行登录验证
        for (String server : servers) {
            if (login(server)) {
                lastRefreshTime = System.currentTimeMillis();
                return true;
            }
        }
    } catch (Throwable ignore) {
    }
    
    return false;
}

这里有个时间窗口。如果当前任务执行的时间还在token窗口内,说明token还没有过期,此时不需要重新认证,否则重新登录认证并更新lastRefreshTime。

public boolean login(String server) throws UnsupportedEncodingException {
    
    if (StringUtils.isNotBlank(username)) {
        Map<String, String> params = new HashMap<String, String>(2);
        Map<String, String> bodyMap = new HashMap<String, String>(2);
        params.put("username", username);
        bodyMap.put("password", URLEncoder.encode(password, "utf-8"));
        String url = "http://" + server + contextPath + LOGIN_URL;
        
        if (server.contains(Constants.HTTP_PREFIX)) {
            url = server + contextPath + LOGIN_URL;
        }
        try {
            HttpRestResult<String> restResult = nacosRestTemplate
                    .postForm(url, Header.EMPTY, Query.newInstance().initParams(params), bodyMap, String.class);
            if (!restResult.ok()) {
                SECURITY_LOGGER.error("login failed: {}", JacksonUtils.toJson(restResult));
                return false;
            }
            JsonNode obj = JacksonUtils.toObj(restResult.getData());
            if (obj.has(Constants.ACCESS_TOKEN)) {
                accessToken = obj.get(Constants.ACCESS_TOKEN).asText();
                tokenTtl = obj.get(Constants.TOKEN_TTL).asInt();
                tokenRefreshWindow = tokenTtl / 10;
            }
        } catch (Exception e) {
            SECURITY_LOGGER.error("[SecurityProxy] login http request failed"
                    + " url: {}, params: {}, bodyMap: {}, errorMsg: {}", url, params, bodyMap, e.getMessage());
            return false;
        }
    }
    return true;
}

最终的登录认证实现,url是http://server/contextPath/v1/auth/users/login

nacosRestTemplate是在初始化securityProxy时构造的:

private static final NacosRestTemplate NACOS_RESTTEMPLATE = ConfigHttpClientManager.getInstance()
        .getNacosRestTemplate();
private static final NacosRestTemplate NACOS_REST_TEMPLATE;

static {
    NACOS_REST_TEMPLATE = HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
    NACOS_REST_TEMPLATE.getInterceptors().add(new LimiterHttpClientRequestInterceptor());
}

主要是加了一个拦截器。

对象的来源由HttpClientBeanHolder.getNacosRestTemplate得到,最终是:

com.alibaba.nacos.common.http.AbstractHttpClientFactory#createNacosRestTemplate

@Override
public NacosRestTemplate createNacosRestTemplate() {
    HttpClientConfig httpClientConfig = buildHttpClientConfig();
    final JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);
    
    // enable ssl
    initTls(new BiConsumer<SSLContext, HostnameVerifier>() {
        @Override
        public void accept(SSLContext sslContext, HostnameVerifier hostnameVerifier) {
            clientRequest.setSSLContext(loadSSLContext());
            clientRequest.replaceSSLHostnameVerifier(hostnameVerifier);
        }
    }, new TlsFileWatcher.FileChangeListener() {
        @Override
        public void onChanged(String filePath) {
            clientRequest.setSSLContext(loadSSLContext());
        }
    });
    
    return new NacosRestTemplate(assignLogger(), clientRequest);
}

也就是requestClient的类型是JdkHttpClientRequest。

我们以get方法为例:com.alibaba.nacos.common.http.client.NacosRestTemplate#get(java.lang.String, com.alibaba.nacos.common.http.param.Header, com.alibaba.nacos.common.http.param.Query, java.lang.reflect.Type)

public <T> HttpRestResult<T> get(String url, Header header, Query query, Type responseType) throws Exception {
    return execute(url, HttpMethod.GET, new RequestHttpEntity(header, query), responseType);
}
private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
            Type responseType) throws Exception {
    URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
    if (logger.isDebugEnabled()) {
        logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
    }

    ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
    HttpClientResponse response = null;
    try {
        response = this.requestClient().execute(uri, httpMethod, requestEntity);
        return responseHandler.handle(response);
    } finally {
        if (response != null) {
            response.close();
        }
    }
}

执行requestClient的execute方法:

public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity)
        throws Exception {
    final Object body = requestHttpEntity.getBody();
    final Header headers = requestHttpEntity.getHeaders();
    replaceDefaultConfig(requestHttpEntity.getHttpClientConfig());
    
    HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection();
    Map<String, String> headerMap = headers.getHeader();
    if (headerMap != null && headerMap.size() > 0) {
        for (Map.Entry<String, String> entry : headerMap.entrySet()) {
            conn.setRequestProperty(entry.getKey(), entry.getValue());
        }
    }
    
    conn.setConnectTimeout(this.httpClientConfig.getConTimeOutMillis());
    conn.setReadTimeout(this.httpClientConfig.getReadTimeOutMillis());
    conn.setRequestMethod(httpMethod);
    if (body != null) {
        String contentType = headers.getValue(HttpHeaderConsts.CONTENT_TYPE);
        String bodyStr = JacksonUtils.toJson(body);
        if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) {
            Map<String, String> map = JacksonUtils.toObj(bodyStr, HashMap.class);
            bodyStr = HttpUtils.encodingParams(map, headers.getCharset());
        }
        if (bodyStr != null) {
            conn.setDoOutput(true);
            byte[] b = bodyStr.getBytes();
            conn.setRequestProperty("Content-Length", String.valueOf(b.length));
            conn.getOutputStream().write(b, 0, b.length);
            conn.getOutputStream().flush();
            conn.getOutputStream().close();
        }
    }
    conn.connect();
    return new JdkHttpClientResponse(conn);
}

再往里就是jdk的了。至此nacos的客户端到服务端的网络通信模型就明确了。

回来继续看NacosConfigService的初始化,当创建完ServerHttpAgent后,紧接着调用了start方法:

@Override
public void start() throws NacosException {
    serverListMgr.start();
}

就是服务管理列表的start方法:

public synchronized void start() throws NacosException {
    
    if (isStarted || isFixed) {
        return;
    }
    
    GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);
    // 这里不是新线程,是直接调用run方法。
    for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {
        getServersTask.run();
        try {
            this.wait((i + 1) * 100L);
        } catch (Exception e) {
            LOGGER.warn("get serverlist fail,url: {}", addressServerUrl);
        }
    }
    
    if (serverUrls.isEmpty()) {
        LOGGER.error("[init-serverlist] fail to get NACOS-server serverlist! env: {}, url: {}", name,
                addressServerUrl);
        throw new NacosException(NacosException.SERVER_ERROR,
                "fail to get NACOS-server serverlist! env:" + name + ", not connnect url:" + addressServerUrl);
    }
    
    // executor schedules the timer task
    // 30秒执行一次任务,更新服务状态
    this.executorService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
    isStarted = true;
}

先run一下,然后丢掉线程池里每隔30秒再run一下

class GetServerListTask implements Runnable {
    
    final String url;
    
    GetServerListTask(String url) {
        this.url = url;
    }
    
    @Override
    public void run() {
        /*
         get serverlist from nameserver
         */
        try {
            updateIfChanged(getApacheServerList(url, name));
        } catch (Exception e) {
            LOGGER.error("[" + name + "][update-serverlist] failed to update serverlist from address server!", e);
        }
    }
}

获取服务列表,如果有变更就更新

private void updateIfChanged(List<String> newList) {
    if (null == newList || newList.isEmpty()) {
        LOGGER.warn("[update-serverlist] current serverlist from address server is empty!!!");
        return;
    }
    
    List<String> newServerAddrList = new ArrayList<String>();
    for (String server : newList) {
        if (server.startsWith(HTTP) || server.startsWith(HTTPS)) {
            newServerAddrList.add(server);
        } else {
            newServerAddrList.add(HTTP + server);
        }
    }
    
    /*
     no change
     */
    if (newServerAddrList.equals(serverUrls)) {
        return;
    }
    serverUrls = new ArrayList<String>(newServerAddrList);
    iterator = iterator();
    currentServerAddr = iterator.next();
    
    // Using unified event processor, NotifyCenter
    // 服务列表发生变更,发布事件消息
    NotifyCenter.publishEvent(new ServerlistChangeEvent());
    LOGGER.info("[{}] [update-serverlist] serverlist updated to {}", name, serverUrls);
}

通知中心发布服务列表变更事件ServerlistChangeEvent

总结一下:

  1. 创建ConfigService对象时,会先创建一个ServerHttpAgent对象,初始化对象时会进行和服务端的认证通信,并启动一个定时任务动态更新token保证token不失效。
  2. 调用ServerHttpAgent的start方法,主要是监控服务列表是否有变更,当发现有变更时发布ServerlistChangeEvent事件。

下篇继续分析ClientWorker的创建过程

相关文章