本章将描述motan部分的特性并对源码进行分析。
1.requestid的维护,使用了当前时间左移20位,再和一个自增变量组合
public classRequestIdGenerator {
protected static final AtomicLong offset = new AtomicLong(0);
protected static final int BITS = 20;
protected static final long MAX_COUNT_PER_MILLIS = 1 <<BITS;
/**
* 获取 requestId
*
* @return
*/
public static longgetRequestId() {
long currentTime =System.currentTimeMillis();
long count =offset.incrementAndGet();
while(count >=MAX_COUNT_PER_MILLIS){
synchronized (RequestIdGenerator.class){
if(offset.get() >=MAX_COUNT_PER_MILLIS){
offset.set(0);
}
}
count =offset.incrementAndGet();
}
return (currentTime << BITS) +count;
}
public static longgetRequestIdFromClient() {
//TODO 上下文 requestid
return 0;
}
}
2.限流,motan支持简单的限流,是利用filter来实现的
@SpiMeta (name = "active" )
@Activation (sequence = 1 )
public class ActiveLimitFilter implements Filter {
@Override
public Response filter(Caller<?> caller, Request request) {
int maxAcvitivyCount = caller.getUrl().getIntParameter(URLParamType.actives.getName(), URLParamType.actives.getIntValue());
if (maxAcvitivyCount > 0 ) {
int activeCount = RpcStats.getServiceStat(caller.getUrl()).getActiveCount();
if (activeCount >= maxAcvitivyCount) {
throw new MotanServiceException(String.format( "Request(%s) active count exceed the limit (%s), referer:%s" , request,
maxAcvitivyCount, caller.getUrl()), MotanErrorMsgConstant.SERVICE_REJECT);
}
}
long startTime = System.currentTimeMillis();
RpcStats.beforeCall(caller.getUrl(), request);
try {
Response rs = caller.call(request);
RpcStats.afterCall(caller.getUrl(), request, true , System.currentTimeMillis() - startTime);
return rs;
} catch (RuntimeException re) {
RpcStats.afterCall(caller.getUrl(), request, false , System.currentTimeMillis() - startTime);
throw re;
}
}
}
3.对于连续失败的client进行不可用操作
voidincrErrorCount() {
long count =errorCount.incrementAndGet();
//如果节点是可用状态,同时当前连续失败的次数超过限制maxClientConnection次,那么把该节点标示为不可用
if (count >= maxClientConnection &&state.isAliveState()) {
synchronized (this) {
count =errorCount.longValue();
if (count >= maxClientConnection &&state.isAliveState()) {
LoggerUtil.error("NettyClient unavailable Error: url=" + url.getIdentity() + " "
+url.getServerPortStr());
state =ChannelState.UNALIVE;
}
}
}
}
voidresetErrorCount() {
errorCount.set(0);
if(state.isAliveState()) {
return;
}
synchronized (this) {
if(state.isAliveState()) {
return;
}
//如果节点是unalive才进行设置,而如果是 close 或者 uninit,那么直接忽略
if(state.isUnAliveState()) {
long count =errorCount.longValue();
//过程中有其他并发更新errorCount的,因此这里需要进行一次判断
if (count <maxClientConnection) {
state =ChannelState.ALIVE;
LoggerUtil.info("NettyClient recover available: url=" + url.getIdentity() + " "
+url.getServerPortStr());
}
}
}
}
4.支持多注册中心,因此cluster的refer集合是所有注册中心包含服务器的集合,如果同一个服务器在不同的注册中心注册,则cluster中当作两个服务器
5.服务端的采用boss线程池+工作线程池+业务线程池的处理方式
private final static ChannelFactory channelFactory = new NioServerSocketChannelFactory( //boss线程池和工作线程池,主要负责接收消息
Executors.newCachedThreadPool( new DefaultThreadFactory( "nettyServerBoss" , true )),
Executors.newCachedThreadPool( new DefaultThreadFactory( "nettyServerWorker" , true )));
private StandardThreadExecutor standardThreadExecutor = null ; //业务线程池,负责具体的业务处理
standardThreadExecutor = (standardThreadExecutor != null && !standardThreadExecutor.isShutdown()) ? standardThreadExecutor
: new StandardThreadExecutor(minWorkerThread, maxWorkerThread, workerQueueSize,
new DefaultThreadFactory( "NettyServer-" + url.getServerPortStr(), true ));
standardThreadExecutor.prestartAllCoreThreads();
final NettyChannelHandler handler = new NettyChannelHandler(NettyServer. this , messageHandler,
standardThreadExecutor); //handler使用业务线程池今天处理具体的业务
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/a1439226817/article/details/68483506
内容来源于网络,如有侵权,请联系作者删除!