motan源码分析十一:部分特性

x33g5p2x  于2021-12-21 转载在 其他  
字(3.5k)|赞(0)|评价(0)|浏览(218)

本章将描述motan部分的特性并对源码进行分析。

1.requestid的维护,使用了当前时间左移20位,再和一个自增变量组合

public classRequestIdGenerator {
    protected static final AtomicLong offset = new AtomicLong(0);
    protected static final int BITS = 20;
    protected static final long MAX_COUNT_PER_MILLIS = 1 <<BITS;

    /**
     * 获取 requestId
     * 
     * @return
     */
    public static longgetRequestId() {
        long currentTime =System.currentTimeMillis();
        long count =offset.incrementAndGet();
        while(count >=MAX_COUNT_PER_MILLIS){
            synchronized (RequestIdGenerator.class){
                if(offset.get() >=MAX_COUNT_PER_MILLIS){
                    offset.set(0);
                }
            }
            count =offset.incrementAndGet();
        }
        return (currentTime << BITS) +count;
    }

    public static longgetRequestIdFromClient() {
        //TODO 上下文 requestid
        return 0;

    }

}

2.限流,motan支持简单的限流,是利用filter来实现的

@SpiMeta (name =  "active" )
@Activation (sequence =  1 )
public  class  ActiveLimitFilter  implements  Filter {
 
     @Override
     public  Response filter(Caller<?> caller, Request request) {
         int  maxAcvitivyCount = caller.getUrl().getIntParameter(URLParamType.actives.getName(), URLParamType.actives.getIntValue());
         if  (maxAcvitivyCount >  0 ) {
             int  activeCount = RpcStats.getServiceStat(caller.getUrl()).getActiveCount();
             if  (activeCount >= maxAcvitivyCount) {
                 throw  new  MotanServiceException(String.format( "Request(%s) active count exceed the limit (%s), referer:%s" , request,
                         maxAcvitivyCount, caller.getUrl()), MotanErrorMsgConstant.SERVICE_REJECT);
             }
         }
 
         long  startTime = System.currentTimeMillis();
         RpcStats.beforeCall(caller.getUrl(), request);
         try  {
             Response rs = caller.call(request);
             RpcStats.afterCall(caller.getUrl(), request,  true , System.currentTimeMillis() - startTime);
             return  rs;
         }  catch  (RuntimeException re) {
             RpcStats.afterCall(caller.getUrl(), request,  false , System.currentTimeMillis() - startTime);
             throw  re;
         }
 
     }
 
}

3.对于连续失败的client进行不可用操作

voidincrErrorCount() {
        long count =errorCount.incrementAndGet();

        //如果节点是可用状态,同时当前连续失败的次数超过限制maxClientConnection次,那么把该节点标示为不可用
        if (count >= maxClientConnection &&state.isAliveState()) {
            synchronized (this) {
                count =errorCount.longValue();

                if (count >= maxClientConnection &&state.isAliveState()) {
                    LoggerUtil.error("NettyClient unavailable Error: url=" + url.getIdentity() + " "
                            +url.getServerPortStr());
                    state =ChannelState.UNALIVE;
                }
            }
        }
    }

    voidresetErrorCount() {
        errorCount.set(0);

        if(state.isAliveState()) {
            return;
        }

        synchronized (this) {
            if(state.isAliveState()) {
                return;
            }

            //如果节点是unalive才进行设置,而如果是 close 或者 uninit,那么直接忽略
            if(state.isUnAliveState()) {
                long count =errorCount.longValue();

                //过程中有其他并发更新errorCount的,因此这里需要进行一次判断
                if (count <maxClientConnection) {
                    state =ChannelState.ALIVE;
                    LoggerUtil.info("NettyClient recover available: url=" + url.getIdentity() + " "
                            +url.getServerPortStr());
                }
            }
        }
    }

4.支持多注册中心,因此cluster的refer集合是所有注册中心包含服务器的集合,如果同一个服务器在不同的注册中心注册,则cluster中当作两个服务器

5.服务端的采用boss线程池+工作线程池+业务线程池的处理方式

private  final  static  ChannelFactory channelFactory =  new  NioServerSocketChannelFactory( //boss线程池和工作线程池,主要负责接收消息
         Executors.newCachedThreadPool( new  DefaultThreadFactory( "nettyServerBoss" ,  true )),
         Executors.newCachedThreadPool( new  DefaultThreadFactory( "nettyServerWorker" ,  true )));
 
     private  StandardThreadExecutor standardThreadExecutor =  null ; //业务线程池,负责具体的业务处理
 
     standardThreadExecutor = (standardThreadExecutor !=  null  && !standardThreadExecutor.isShutdown()) ? standardThreadExecutor
             :  new  StandardThreadExecutor(minWorkerThread, maxWorkerThread, workerQueueSize,
                     new  DefaultThreadFactory( "NettyServer-"  + url.getServerPortStr(),  true ));
     standardThreadExecutor.prestartAllCoreThreads();
 
    final  NettyChannelHandler handler =  new  NettyChannelHandler(NettyServer. this , messageHandler,
             standardThreadExecutor); //handler使用业务线程池今天处理具体的业务
上一篇:十:流量切换

相关文章