xxl-job执行器端接收调度器任务执行的原理

x33g5p2x  于2021-12-21 转载在 其他  
字(5.9k)|赞(0)|评价(0)|浏览(457)

调度中心通过netty发送调度请求,执行器有个专门的处理类ExecutorBizImpl去响应调度请求。

实际上调度中心和执行器部署在不同的服务器上,通过executorBiz.run(triggerParam)并不会让执行器的处理类ExecutorBizImpl响应(ExecutorBizImpl实现了ExecutorBiz接口),executorBiz.run(triggerParam)执行的时候会触发InvocationHandler的invoke方法,通过netty向执行器发送请求。

public Object getObject() {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{this.iface}, new InvocationHandler() {
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                String className = method.getDeclaringClass().getName();
                String varsion_ = XxlRpcReferenceBean.this.version;
                String methodName = method.getName();
                Class<?>[] parameterTypes = method.getParameterTypes();
                Object[] parameters = args;
                    if (finalAddress != null && finalAddress.trim().length() != 0) {
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                        xxlRpcRequest.setAccessToken(XxlRpcReferenceBean.this.accessToken);
                        xxlRpcRequest.setClassName(className);
                        xxlRpcRequest.setMethodName(methodName);
                        xxlRpcRequest.setParameterTypes(parameterTypes);
                        xxlRpcRequest.setParameters(parameters);
                        XxlRpcFutureResponse futureResponse;
                        if (CallType.SYNC == XxlRpcReferenceBean.this.callType) {
                            futureResponse = new XxlRpcFutureResponse(XxlRpcReferenceBean.this.invokerFactory, xxlRpcRequest, (XxlRpcInvokeCallback)null);

                            Object var31;
                            try {
                              //通过netty向执行器发送请求
                                XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
                                XxlRpcResponse xxlRpcResponse = futureResponse.get(XxlRpcReferenceBean.this.timeout, TimeUnit.MILLISECONDS);
                                if (xxlRpcResponse.getErrorMsg() != null) {
                                    throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                                }

                                var31 = xxlRpcResponse.getResult();
                            } catch (Exception var21) {
                                XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
                                throw (Throwable)(var21 instanceof XxlRpcException ? var21 : new XxlRpcException(var21));
                            } finally {
                                futureResponse.removeInvokerFuture();
                            }

                            return var31;
                        }

执行器的ExecutorBizImpl去处理任务执行请求,这里只介绍Bean模式任务方式(再次方式下Handler才能够使用)

public class ExecutorBizImpl implements ExecutorBiz {
   
    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        // 获取任务线程
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        //通过jobThread 获取任务Handler
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // valid:jobHandler + jobThread
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // 通过在任务管理添加的JobHandler参数获取对应的JobHandler类实例
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // 在这里判断通过jobId获取的jobHandler和通过JonHandler参数获取的newJobHandler是不是同一个
            //如果不是就要重新获取
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }
        }
     //在这里判断通过jobId获取的jobHandler和通过JonHandler参数获取的newJobHandler是不是同一个
     //重新实例化jobThread,并开启线程
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    //将调度中心发过来的请求参数都入队列
      ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    }

重新实例化jobThread,并开启线程

private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
        public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
            JobThread newJobThread = new JobThread(jobId, handler);
            newJobThread.start();
            //将jobThread存入jobThreadRepository,把老jobThread的删除并且停止线程
            JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
            if (oldJobThread != null) {
                oldJobThread.toStop(removeOldReason);
                oldJobThread.interrupt();
            }
    
            return newJobThread;
        }

然后将调度中心发过来的请求参数都入队列
ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam);

public class JobThread extends Thread{
    public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
    	 
    	triggerLogIdSet.add(triggerParam.getLogId());
    	triggerQueue.add(triggerParam);
        return ReturnT.SUCCESS;
    }
    }

我们前面说到jobThreadRepository里面的jobThread都在启动状态
我们就来看看此线程做了什么

public class JobThread extends Thread{
 @Override
	public void run() {
    
		//在这里有个while循环,一直不断的在执行
		while(!toStop){
	 
            try {
			 //3秒钟出一次队列
				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
				if (triggerParam!=null) {
					running = true;
					idleTimes = 0;
					triggerLogIdSet.remove(triggerParam.getLogId());
                   //如果设置了任务执行的超值时间 就启动个线程并设置任务超时时间  
                   //此任务用来执行handler.execute()方法
					if (triggerParam.getExecutorTimeout() > 0) {
						// limit timeout
						Thread futureThread = null;
						try {
							final TriggerParam triggerParamTmp = triggerParam;
							FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
								@Override
								public ReturnT<String> call() throws Exception {
									return handler.execute(triggerParamTmp.getExecutorParams());
								}
							});
							futureThread = new Thread(futureTask);
							futureThread.start();

							executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
						} catch (TimeoutException e) {

							XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
							XxlJobLogger.log(e);

							executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
						} finally {
							futureThread.interrupt();
						}
					} else {
					//没有设置任务的超时时间  直接通过handler调用execute()方法
						// just execute
						executeResult = handler.execute(triggerParam.getExecutorParams());
					}

					if (executeResult == null) {
						executeResult = IJobHandler.FAIL;
					} else {
						executeResult.setMsg(
								(executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
										?executeResult.getMsg().substring(0, 50000).concat("...")
										:executeResult.getMsg());
						executeResult.setContent(null);	// limit obj size
					}
					XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

				} else {
					if (idleTimes > 30) {
						XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
					}
				}
			} 
			}
}

相关文章