motan源码分析五:cluster相关

x33g5p2x  于2021-12-21 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(145)

上一章我们分析了客户端调用服务端相关的源码,但是到了cluster里面的部分我们就没有分析了,本章将深入分析cluster和它的相关支持类。

1.clustersupport的创建过程,上一章的ReferConfig的initRef()方法中调用了相关的创建代码:

for(Iterator iterator =protocols.iterator(); iterator.hasNext();)
        {
            ProtocolConfig protocol =(ProtocolConfig)iterator.next();
            LoggerUtil.info((new StringBuilder("ProtocolConfig's")).append(protocol.getName()).toString());
            Map params = newHashMap();
            params.put(URLParamType.nodeType.getName(), "referer");
            params.put(URLParamType.version.getName(), URLParamType.version.getValue());
            params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
            collectConfigParams(params, newAbstractConfig[] {
                protocol, basicReferer, extConfig, this});
            collectMethodConfigParams(params, getMethods());
            URL refUrl = new URL(protocol.getName(), localIp, 0, interfaceClass.getName(), params);
            ClusterSupport clusterSupport =createClusterSupport(refUrl, configHandler, registryUrls);//创建clustersupport
            clusterSupports.add(clusterSupport);
            clusters.add(clusterSupport.getCluster());//获取对应的cluster
            proxy = proxy != null ?proxy : refUrl.getParameter(URLParamType.proxy.getName(), URLParamType.proxy.getValue());
        }

    privateClusterSupport createClusterSupport(URL refUrl, ConfigHandler configHandler, List registryUrls)
    {
        List regUrls = newArrayList();
        if(StringUtils.isNotBlank(directUrl) || "injvm".equals(refUrl.getProtocol()))
        {
            URL regUrl = new URL("local", "127.0.0.1", 0, com/weibo/api/motan/registry/RegistryService.getName());
            if(StringUtils.isNotBlank(directUrl))
            {
                StringBuilder duBuf = new StringBuilder(128);
                String dus[] =MotanConstants.COMMA_SPLIT_PATTERN.split(directUrl);
                String as[];
                int j = (as =dus).length;
                for(int i = 0; i < j; i++)
                {
                    String du =as[i];
                    if(du.contains(":"))
                    {
                        String hostPort[] = du.split(":");
                        URL durl =refUrl.createCopy();
                        durl.setHost(hostPort[0].trim());
                        durl.setPort(Integer.parseInt(hostPort[1].trim()));
                        durl.addParameter(URLParamType.nodeType.getName(), "service");
                        duBuf.append(StringTools.urlEncode(durl.toFullStr())).append(",");
                    }
                }

                if(duBuf.length() > 0)
                {
                    duBuf.deleteCharAt(duBuf.length() - 1);
                    regUrl.addParameter(URLParamType.directUrl.getName(), duBuf.toString());
                }
            }
            regUrls.add(regUrl);
        } else//走注册中心的方式{
            if(registryUrls == null ||registryUrls.isEmpty())
                throw new IllegalStateException(String.format("No registry to reference %s on the consumer %s , please config <motan:registry address=\"...\" /> in your spring config.", newObject[] {
                    interfaceClass, "127.0.0.1"}));
            URL url;
            for(Iterator iterator =registryUrls.iterator(); iterator.hasNext(); regUrls.add(url.createCopy()))
                url =(URL)iterator.next();

        }
        URL url;
        for(Iterator iterator1 =regUrls.iterator(); iterator1.hasNext(); url.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(refUrl.toFullStr())))
            url =(URL)iterator1.next();

        returnconfigHandler.buildClusterSupport(interfaceClass, regUrls);//调用simpleconfighandler的创建clustersupport方法
    }

    public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL>registryUrls) {
        ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls);//创建cluster支持类,将业务接口和注册中心信息传递进去
        clusterSupport.init();//初始化

        returnclusterSupport;
    }

2.clustersupport的init和prepare方法

public  void  init() {
 
     prepareCluster();
 
     URL subUrl = toSubscribeUrl(url);
     for  (URL ru : registryUrls) { //循环注册中心的url
 
         String directUrlStr = ru.getParameter(URLParamType.directUrl.getName());
         // 如果有directUrl,直接使用这些directUrls进行初始化,不用到注册中心discover
         if  (StringUtils.isNotBlank(directUrlStr)) {
             List<URL> directUrls = parseDirectUrls(directUrlStr);
             if  (!directUrls.isEmpty()) {
                 notify(ru, directUrls);
                 LoggerUtil.info( "Use direct urls, refUrl={}, directUrls={}" , url, directUrls);
                 continue ;
             }
         }
 
         // client 注册自己,同时订阅service列表
         Registry registry = getRegistry(ru); //获取zookeeper的注册中心
         registry.subscribe(subUrl,  this ); //注册自己并订阅服务
     }
 
     boolean  check = Boolean.parseBoolean(url.getParameter(URLParamType.check.getName(), URLParamType.check.getValue()));
     if  (!CollectionUtil.isEmpty(cluster.getReferers()) || !check) {
         cluster.init(); //初始化集群
         if  (CollectionUtil.isEmpty(cluster.getReferers()) && !check) {
             LoggerUtil.warn(String.format( "refer:%s" ,  this .url.getPath() +  "/"  +  this .url.getVersion()),  "No services" );
         }
         return ;
     }
 
     throw  new  MotanFrameworkException(String.format( "ClusterSupport No service urls for the refer:%s, registries:%s" ,
             this .url.getIdentity(), registryUrls), MotanErrorMsgConstant.SERVICE_UNFOUND);
}
 
private  void  prepareCluster() {
     String clusterName = url.getParameter(URLParamType.cluster.getName(), URLParamType.cluster.getValue()); //集群名称
     String loadbalanceName = url.getParameter(URLParamType.loadbalance.getName(), URLParamType.loadbalance.getValue()); //负载均衡名称
     String haStrategyName = url.getParameter(URLParamType.haStrategy.getName(), URLParamType.haStrategy.getValue()); //ha高可用名称
 
     cluster = ExtensionLoader.getExtensionLoader(Cluster. class ).getExtension(clusterName); //获取具体的集群对象
     LoadBalance<T> loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance. class ).getExtension(loadbalanceName); //获取具体的负载均衡方式,目前motan支持6种负载方式
     HaStrategy<T> ha = ExtensionLoader.getExtensionLoader(HaStrategy. class ).getExtension(haStrategyName); //获取高可用的方式,目前支持两种failfast和failover方式
     cluster.setLoadBalance(loadBalance);
     cluster.setHaStrategy(ha);
     cluster.setUrl(url);
}

3.负载均衡,motan支持6种方式,分别是:轮训、随机、hash、本地服务优先、权重可配置、低并发优先,具体代码可见com.weibo.api.motan.cluster.loadbalance目录,本文我们主要看一下轮训的方式:

public  class  RoundRobinLoadBalance<T>  extends  AbstractLoadBalance<T> {
 
     private  AtomicInteger idx =  new  AtomicInteger( 0 );
 
     @Override
     protected  Referer<T> doSelect(Request request) {
         List<Referer<T>> referers = getReferers(); //获取所有服务器的引用
 
         int  index = idx.incrementAndGet(); //自增
         for  ( int  i =  0 ; i < referers.size(); i++) {
             Referer<T> ref = referers.get((i + index) % referers.size()); //利用自增数去模,达到轮训的目的
             if  (ref.isAvailable()) {
                 return  ref;
             }
         }
         return  null ;
     }
 
     @Override
     protected  void  doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
         List<Referer<T>> referers = getReferers();
 
         int  index = idx.incrementAndGet();
         for  ( int  i =  0 ; i < referers.size(); i++) {
             Referer<T> referer = referers.get((i + index) % referers.size());
             if  (referer.isAvailable()) {
                 refersHolder.add(referer);
             }
         }
     }
}

4.motan支持failfast和failover两种方式,failfast只调用一次,如果失败则直接返回失败,failover循环调用若干次,直到成功或循环结束后

public  Response call(Request request, LoadBalance<T> loadBalance) {
 
     List<Referer<T>> referers = selectReferers(request, loadBalance); //获取所有的引用
     if  (referers.isEmpty()) {
         throw  new  MotanServiceException(String.format( "FailoverHaStrategy No referers for request:%s, loadbalance:%s" , request,
                 loadBalance));
     }
     URL refUrl = referers.get( 0 ).getUrl();
     // 先使用method的配置
     int  tryCount =
             refUrl.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.retries.getName(),
                     URLParamType.retries.getIntValue()); //获取重试次数
     // 如果有问题,则设置为不重试
     if  (tryCount <  0 ) {
         tryCount =  0 ;
     }
 
     for  ( int  i =  0 ; i <= tryCount; i++) {
         Referer<T> refer = referers.get(i % referers.size()); //循环调用
         try  {
             request.setRetries(i);
             return  refer.call(request);
         }  catch  (RuntimeException e) {
             // 对于业务异常,直接抛出
             if  (ExceptionUtil.isBizException(e)) {
                 throw  e; //业务异常退出调用
             }  else  if  (i >= tryCount) {
                 throw  e;
             }
             LoggerUtil.warn(String.format( "FailoverHaStrategy Call false for request:%s error=%s" , request, e.getMessage()));
         }
     }
 
     throw  new  MotanFrameworkException( "FailoverHaStrategy.call should not come here!" );
}

本章知识点总结:

1.一个cluster有一个cluster的支持类,有一个ha,有一个loadbalance;

2.motan支持6种负载均衡方式;

3.motan支持failover的ha方式;

相关文章