motan源码分析一:服务发布及注册

x33g5p2x  于2021-12-21 转载在 其他  
字(12.8k)|赞(0)|评价(0)|浏览(182)

motan是新浪微博开源的服务治理框架,具体介绍请看:http://tech.sina.com.cn/i/2016-05-10/doc-ifxryhhh1869879.shtml.

本系列的文章将分析它的底层源码,分析的源码版本为:0.1.2。第一篇文章将以服务的发布和注册开始,注册服务使用zookeeper来分析。源码地址:https://github.com/weibocom/motan

本文涉及到的主要类和接口:MotanApiExportDemo、MotanDemoService、MotanDemoServiceImpl、ServiceConfig、RegistryConfig、ProtocolConfig、DefaultProvider、ZookeeperRegistryFactory、ZookeeperRegistry、SimpleConfigHandler、ProtocolFilterDecorator等。

1.首先来看demo源码:MotanApiExportDemo 

  demo中先后创建了ServiceConfig、RegistryConfig和ProtocolConfig相关的对象,其中ServiceConfig是我们提供服务的相关配置(每个服务一个配置,例如一个服务接口一个配置,本文中的具体服务是:MotanDemoServiceImpl)、RegistryConfig是注册中心相关的配置信息、ProtocolConfig是应用协议相关的配置(在客户端还负责集群相关的配置)。

ServiceConfig<MotanDemoService> motanDemoService =  new  ServiceConfig<MotanDemoService>();
 
// 设置接口及实现类
motanDemoService.setInterface(MotanDemoService. class ); //设置服务接口,客户端在rpc调用时,会在协议中传递接口名称,从而实现与具体实现类一一对应
motanDemoService.setRef( new  MotanDemoServiceImpl()); //设置接口实现类,实际的业务代码
 
// 配置服务的group以及版本号
motanDemoService.setGroup( "motan-demo-rpc" ); //服务所属的组
motanDemoService.setVersion( "1.0" );
 
// 配置ZooKeeper注册中心
RegistryConfig zookeeperRegistry =  new  RegistryConfig();
zookeeperRegistry.setRegProtocol( "zookeeper" ); //使用zookeeper作为注册中心
zookeeperRegistry.setAddress( "127.0.0.1:2181" ); //zookeeper的连接地址
motanDemoService.setRegistry(zookeeperRegistry);
 
// 配置RPC协议
ProtocolConfig protocol =  new  ProtocolConfig();
protocol.setId( "motan" ); //使用motan应用协议
protocol.setName( "motan" );
motanDemoService.setProtocol(protocol);
 
motanDemoService.setExport( "motan:8010" ); //本服务的监控端口号是8010
motanDemoService.export(); //发布及在zookeeper上注册此服务

2.从上面的代码可知ServiceConfig类是服务的发布及注册的核心是motanDemoService.export()方法,我们来看一下此方法的实现细节:

public  synchronized  void  export()
{
     if (exported.get())
     {
         LoggerUtil.warn(String.format( "%s has already been expoted, so ignore the export request!" ,  new  Object[] {
             interfaceClass.getName()
         }));
         return ;
     }
     checkInterfaceAndMethods(interfaceClass, methods);
     List registryUrls = loadRegistryUrls(); //加载注册中心的url,支持多个注册中心
     if (registryUrls ==  null  || registryUrls.size() ==  0 )
         throw  new  IllegalStateException(( new  StringBuilder( "Should set registry config for service:" )).append(interfaceClass.getName()).toString());
     Map protocolPorts = getProtocolAndPort();
     ProtocolConfig protocolConfig;
     Integer port;
     for (Iterator iterator = protocols.iterator(); iterator.hasNext(); doExport(protocolConfig, port.intValue(), registryUrls)) //发布服务
     {
         protocolConfig = (ProtocolConfig)iterator.next();
         port = (Integer)protocolPorts.get(protocolConfig.getId());
         if (port ==  null )
             throw  new  MotanServiceException(String.format( "Unknow port in service:%s, protocol:%s" ,  new  Object[] {
                 interfaceClass.getName(), protocolConfig.getId()
             }));
     }
 
     afterExport();
}

方法中调用了doexport和afterExport方法:

private void doExport(ProtocolConfig protocolConfig, intport, List registryURLs)
    {
        String protocolName =protocolConfig.getName();//获取协议名称,此处为motan
        if(protocolName == null || protocolName.length() == 0)
            protocolName =URLParamType.protocol.getValue();
        String hostAddress =host;//本机地址
        if(StringUtils.isBlank(hostAddress) && basicServiceConfig != null)
            hostAddress =basicServiceConfig.getHost();
        if(NetUtils.isInvalidLocalHost(hostAddress))
            hostAddress =getLocalHostAddress(registryURLs);
        Map map = newHashMap();
        map.put(URLParamType.nodeType.getName(), "service");
        map.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
        collectConfigParams(map, newAbstractConfig[] {
            protocolConfig, basicServiceConfig, extConfig, this});
        collectMethodConfigParams(map, getMethods());
        URL serviceUrl = newURL(protocolName, hostAddress, port, interfaceClass.getName(), map);//组装serviceUrl信息
        if(serviceExists(serviceUrl))//判断服务之前是否已经加载过
        {
            LoggerUtil.warn(String.format("%s configService is malformed, for same service (%s) already exists ", newObject[] {
                interfaceClass.getName(), serviceUrl.getIdentity()
            }));
            throw new MotanFrameworkException(String.format("%s configService is malformed, for same service (%s) already exists ", newObject[] {
                interfaceClass.getName(), serviceUrl.getIdentity()
            }), MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);//抛出同名服务异常
        }
        List urls = newArrayList();
        if("injvm".equals(protocolConfig.getId()))
        {
            URL localRegistryUrl = null;
            for(Iterator iterator2 =registryURLs.iterator(); iterator2.hasNext();)
            {
                URL ru =(URL)iterator2.next();
                if("local".equals(ru.getProtocol()))
                {
                    localRegistryUrl =ru.createCopy();
                    break;
                }
            }

            if(localRegistryUrl == null)
                localRegistryUrl = new URL("local", hostAddress, 0, com/weibo/api/motan/registry/RegistryService.getName());
            urls.add(localRegistryUrl);
        } else{
            URL ru;
            for(Iterator iterator =registryURLs.iterator(); iterator.hasNext(); urls.add(ru.createCopy()))
                ru =(URL)iterator.next();

        }
        URL u;
        for(Iterator iterator1 =urls.iterator(); iterator1.hasNext(); registereUrls.add(u.createCopy()))
        {
            u =(URL)iterator1.next();
            u.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(serviceUrl.toFullStr()));
        }

        ConfigHandler configHandler = (ConfigHandler)ExtensionLoader.getExtensionLoader(com/weibo/api/motan/config/handler/ConfigHandler).getExtension("default");//使用spi机制加载SimpleConfigHandler
        exporters.add(configHandler.export(interfaceClass, ref, urls));//调用SimpleConfigHandler的export方法
        initLocalAppInfo(serviceUrl);
    }

    private voidafterExport()
    {
        exported.set(true);
        Exporter ep;
        for(Iterator iterator =exporters.iterator(); iterator.hasNext(); existingServices.add(ep.getProvider().getUrl().getIdentity()))
            ep =(Exporter)iterator.next();

    }

再来看一下SimpleConfigHandler的export方法

public  <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) {
 
     String serviceStr = StringTools.urlDecode(registryUrls.get( 0 ).getParameter(URLParamType.embed.getName()));
     URL serviceUrl = URL.valueOf(serviceStr);
 
     // export service
     // 利用protocol decorator来增加filter特性
     String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());
     Protocol protocol =  new  ProtocolFilterDecorator(ExtensionLoader.getExtensionLoader(Protocol. class ).getExtension(protocolName)); //对于Protoclo对象增强filter
     Provider<T> provider =  new  DefaultProvider<T>(ref, serviceUrl, interfaceClass);服务的代理提供者,包装ref的服务
     Exporter<T> exporter = protocol.export(provider, serviceUrl); //发布服务,将代理对象provider与具体的serviceUrl关联
 
     // register service
     register(registryUrls, serviceUrl);
 
     return  exporter;
}

3.下面我们来看一下,motan如何对filter进行相应的增强处理

public  class  ProtocolFilterDecorator  implements  Protocol {  //实现Protocol的接口,联系到上文中使用此类对实际的Protocol进行包装
 
     private  Protocol protocol;
 
     public  ProtocolFilterDecorator(Protocol protocol) {
         if  (protocol ==  null ) {
             throw  new  MotanFrameworkException( "Protocol is null when construct ProtocolFilterDecorator" ,
                     MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
         }
         this .protocol = protocol; //给实际的Protocol进行赋值
     }
 
     @Override
     public  <T> Exporter<T> export(Provider<T> provider, URL url) {
         return  protocol.export(decorateWithFilter(provider, url), url);发布服务时,调用filter增强处理方法
     }
 
     private  <T> Provider<T> decorateWithFilter(Provider<T> provider, URL url) {
         List<Filter> filters = getFilters(url, MotanConstants.NODE_TYPE_SERVICE); //获取实际需要增强的filter
         if  (filters ==  null  || filters.size() ==  0 ) {
             return  provider;
         }
         Provider<T> lastProvider = provider;
         for  (Filter filter : filters) { //对于代理对象provider进行包装,包装成一个provider链,返回最后一个provider
             final  Filter f = filter;
             final  Provider<T> lp = lastProvider;
             lastProvider =  new  Provider<T>() {
                 @Override
                 public  Response call(Request request) {
                     return  f.filter(lp, request); //对于后面调用的call方法时,首先调用最外层的filter,最后再调用实际的provider的call方法
                 }
 
                 @Override
                 public  String desc() {
                     return  lp.desc();
                 }
 
                 @Override
                 public  void  destroy() {
                     lp.destroy();
                 }
 
                 @Override
                 public  Class<T> getInterface() {
                     return  lp.getInterface();
                 }
 
                 @Override
                 public  URL getUrl() {
                     return  lp.getUrl();
                 }
 
                 @Override
                 public  void  init() {
                     lp.init();
                 }
 
                 @Override
                 public  boolean  isAvailable() {
                     return  lp.isAvailable();
                 }
             };
         }
         return  lastProvider;
     }
 
     private  List<Filter> getFilters(URL url, String key) {
 
         // load default filters
         List<Filter> filters =  new  ArrayList<Filter>();
         List<Filter> defaultFilters = ExtensionLoader.getExtensionLoader(Filter. class ).getExtensions(key); //使用spi机制初始化filer对象
         if  (defaultFilters !=  null  && defaultFilters.size() >  0 ) {
             filters.addAll(defaultFilters);
         }
 
         // add filters via "filter" config
         String filterStr = url.getParameter(URLParamType.filter.getName());
         if  (StringUtils.isNotBlank(filterStr)) {
             String[] filterNames = MotanConstants.COMMA_SPLIT_PATTERN.split(filterStr);
             for  (String fn : filterNames) {
                 addIfAbsent(filters, fn);
             }
         }
 
         // add filter via other configs, like accessLog and so on
         boolean  accessLog = url.getBooleanParameter(URLParamType.accessLog.getName(), URLParamType.accessLog.getBooleanValue());
         if  (accessLog) {
             addIfAbsent(filters, AccessLogFilter. class .getAnnotation(SpiMeta. class ).name());
         }
 
         // sort the filters
         Collections.sort(filters,  new  ActivationComparator<Filter>());
         Collections.reverse(filters);
         return  filters;
     }
}

4.服务发布完成后,需要像注册中心注册此服务

private  void  register(List<URL> registryUrls, URL serviceUrl) {
 
     for  (URL url : registryUrls) { //循环便利多个注册中心的信息
         // 根据check参数的设置,register失败可能会抛异常,上层应该知晓
         RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory. class ).getExtension(url.getProtocol()); //文中使用的是zookeeper
         if  (registryFactory ==  null ) {
             throw  new  MotanFrameworkException( new  MotanErrorMsg( 500 , MotanErrorMsgConstant.FRAMEWORK_REGISTER_ERROR_CODE,
                     "register error! Could not find extension for registry protocol:"  + url.getProtocol()
                             +  ", make sure registry module for "  + url.getProtocol() +  " is in classpath!" ));
         }
         Registry registry = registryFactory.getRegistry(url); //获取registry
         registry.register(serviceUrl); //将服务注册到zookeeper,也就是把节点信息写入到zookeeper中
     }
}

我们来看一下zookeeper注册中心的工厂类:每个Registry都需要独立维护一个ZkClient与zookeeper的链接

@SpiMeta (name =  "zookeeper" )
public  class  ZookeeperRegistryFactory  extends  AbstractRegistryFactory {
 
     @Override
     protected  Registry createRegistry(URL registryUrl) {
         try  {
             int  timeout = registryUrl.getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
             int  sessionTimeout =
                     registryUrl.getIntParameter(URLParamType.registrySessionTimeout.getName(),
                             URLParamType.registrySessionTimeout.getIntValue());
             ZkClient zkClient =  new  ZkClient(registryUrl.getParameter( "address" ), sessionTimeout, timeout); //创建zookeeper的客户端
             return  new  ZookeeperRegistry(registryUrl, zkClient); //创建实际的Registry
         }  catch  (ZkException e) {
             LoggerUtil.error( "[ZookeeperRegistry] fail to connect zookeeper, cause: "  + e.getMessage());
             throw  e;
         }
     }
}

我们再来分析ZookeeperRegistry中的代码

publicZookeeperRegistry(URL url, ZkClient client) {
        super(url);
        this.zkClient =client;
        IZkStateListener zkStateListener = newIZkStateListener() {
            @Override
            public void handleStateChanged(Watcher.Event.KeeperState state) throwsException {
                //do nothing
}

            @Override
            public void handleNewSession() throwsException {//响应zkClient的事件
                LoggerUtil.info("zkRegistry get new session notify.");
                reconnectService();//重新注册服务
                reconnectClient();
            }
        };
        zkClient.subscribeStateChanges(zkStateListener);
    }
    private voidreconnectService() {
        Collection<URL> allRegisteredServices =getRegisteredServiceUrls();
        if (allRegisteredServices != null && !allRegisteredServices.isEmpty()) {
            try{
                serverLock.lock();
                for(URL url : getRegisteredServiceUrls()) {
                    doRegister(url);//注册
                }
                LoggerUtil.info("[{}] reconnect: register services {}", registryClassName, allRegisteredServices);

                for(URL url : availableServices) {
                    if (!getRegisteredServiceUrls().contains(url)) {
                        LoggerUtil.warn("reconnect url not register. url:{}", url);
                        continue;
                    }
                    doAvailable(url);//标识服务可以提供服务
                }
                LoggerUtil.info("[{}] reconnect: available services {}", registryClassName, availableServices);
            } finally{
                serverLock.unlock();
            }
        }
    }
    protected voiddoRegister(URL url) {
        try{
            serverLock.lock();
            //防止旧节点未正常注销
removeNode(url, ZkNodeType.AVAILABLE_SERVER);
            removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
            createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
        } catch(Throwable e) {
            throw new MotanFrameworkException(String.format("Failed to register %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
        } finally{
            serverLock.unlock();
        }
    }
    protected voiddoAvailable(URL url) {
        try{
            serverLock.lock();
            if (url == null) {
                availableServices.addAll(getRegisteredServiceUrls());
                for(URL u : getRegisteredServiceUrls()) {
                    removeNode(u, ZkNodeType.AVAILABLE_SERVER);
                    removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
                    createNode(u, ZkNodeType.AVAILABLE_SERVER);
                }
            } else{
                availableServices.add(url);
                removeNode(url, ZkNodeType.AVAILABLE_SERVER);
                removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
                createNode(url, ZkNodeType.AVAILABLE_SERVER);
            }
        } finally{
            serverLock.unlock();
        }
    }
private  void  createNode(URL url, ZkNodeType nodeType) {
     String nodeTypePath = ZkUtils.toNodeTypePath(url, nodeType);
     if  (!zkClient.exists(nodeTypePath)) {
         zkClient.createPersistent(nodeTypePath,  true ); //对于服务的标识信息,创建持久化节点
     }
     zkClient.createEphemeral(ZkUtils.toNodePath(url, nodeType), url.toFullStr()); //对于服务的ip和端口号信息使用临时节点,当服务断了后,zookeeper自动摘除目标服务器
}

    本文分析了motan的服务发布及注册到zookeeper的流程相关的源码,主要涉及到的知识点:

1.利用相关的配置对象进行信息的存储及传递;

2.利用provider对具体的业务类进行封装代理;

3.利用filter链的结构,来包装实际的provider,把所有的过滤器都处理完毕后,最后调用实际的业务类,大家可以想象一下aop相关的原理,有些类似;

4.代码中大量使用jdk的标准spi技术进行类的加载;

5.支持多个注册中心,也就是同一个服务可以注册到不同的注册中心上,每个registry对应一个具体的zkclient;

6.利用了zookeeper的临时节点来维护服务器的host和port信息;

7.支持多个服务发布到同一个端口,在本文中并没分析netty使用相关的代码,后面会分析到。

相关文章