xxl-job任务管理以及调度器端任务手动执行的原理

x33g5p2x  于2021-12-21 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(738)

1、在任务管理模块里面可以新增任务,可以执行任务、启动任务、编辑任务。同时可以查看任务日志。

2、我们看一下相关代码

3、基本都是数据库的增删改查,我们重点研究一下start(),stop()和triggerJob()
1)任务停止

@Override

public ReturnT<String> stop(int id) {
   //从数据库表中查找到任务详情
    XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
    //修改三个字段
    //调度状态:0-停止,1-运行
	xxlJobInfo.setTriggerStatus(0);
	//上次调度时间
	xxlJobInfo.setTriggerLastTime(0);
	//下次调度时间
	xxlJobInfo.setTriggerNextTime(0);

	xxlJobInfoDao.update(xxlJobInfo);
	return ReturnT.SUCCESS;
}

2)任务开始运行

public ReturnT<String> start(int id) {
		XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);

		// next trigger time (5s后生效,避开预读周期)
		long nextTriggerTime = 0;
		try {
			Date nextValidTime = new CronExpression(xxlJobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS));
			if (nextValidTime == null) {
				return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_never_fire"));
			}
			nextTriggerTime = nextValidTime.getTime();
		} catch (ParseException e) {
			logger.error(e.getMessage(), e);
			return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_unvalid")+" | "+ e.getMessage());
		}
		//修改三个字段
        //调度状态:0-停止,1-运行
		xxlJobInfo.setTriggerStatus(1);
		//上次调度时间
		xxlJobInfo.setTriggerLastTime(0);
		//下次调度时间
		xxlJobInfo.setTriggerNextTime(nextTriggerTime);
		xxlJobInfoDao.update(xxlJobInfo);
		return ReturnT.SUCCESS;
	}

3)任务执行一次

private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
  
	@RequestMapping("/trigger")
	@ResponseBody
	//@PermissionLimit(limit = false)
	public ReturnT<String> triggerJob(int id, String executorParam) {
		// force cover job param
		if (executorParam == null) {
			executorParam = "";
		}

		JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam);
		return ReturnT.SUCCESS;
	}

     public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
            helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
        }

主动执行任务是通过调用JobTriggerPoolHelper.addTrigger()方法去实现的,继续跟踪代码

public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
        // 加载数据
        XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);

        if (executorParam != null) {
        //设置执行器的配置参数
            jobInfo.setExecutorParam(executorParam);
        }
        //设置失败重试次数
        int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
        //根据执行器主键id 从xxl_job_group表中获取执行器的app_name和执行器address_list(地址:端口号)
        XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

       //执行器路由策略
       //  分片广播策略,广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
                && shardingParam==null) {
            for (int i = 0; i < group.getRegistryList().size(); i++) {
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
            }
        } else {
        //其他路由策略
            if (shardingParam == null) {
                shardingParam = new int[]{0, 1};
            }
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
        }

    }

继续关注processTrigger()

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

        // 获取参数   阻塞策略
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  
        //  获取参数  路由策略
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

        // 1、存储日志
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        jobLog.setJobId(jobInfo.getId());
        jobLog.setTriggerTime(new Date());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

        // 2、初始化 trigger参数
        TriggerParam triggerParam = new TriggerParam();
        triggerParam.setJobId(jobInfo.getId());
        triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
        triggerParam.setExecutorParams(jobInfo.getExecutorParam());
        triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
        triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
        triggerParam.setLogId(jobLog.getId());
        triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
        triggerParam.setGlueType(jobInfo.getGlueType());
        triggerParam.setGlueSource(jobInfo.getGlueSource());
        triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
        triggerParam.setBroadcastIndex(index);
        triggerParam.setBroadcastTotal(total);

        // 3、初始化 执行器的地址
        String address = null;
        ReturnT<String> routeAddressResult = null;
        //group.getRegistryList() 获取执行器的地址
        if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        //如果路由策略是分片广播 就获取getRegistryList对应的执行器地址
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                if (index < group.getRegistryList().size()) {
                    address = group.getRegistryList().get(index);
                } else {
                    address = group.getRegistryList().get(0);
                }
            } else {
              //如果路由策略是其他策略
              //根据不同路由策略,通过ExecutorRouter的实现类的route()获取执行器地址
                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                    address = routeAddressResult.getContent();
                }
            }
        } else {
            routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
        }

        // 4、trigger remote executor
        ReturnT<String> triggerResult = null;
        if (address != null) {
           //根据address地址,远程调用执行器服务接口,执行任务
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }

继续跟踪runExecutor()

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
        try {
           //获取执行器动态代理类
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            //执行run的时候,动态代理就会执行InvocationHandler对象的invoke()
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }

        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
        return runResult;
    }

通过 XxlJobScheduler.getExecutorBiz(address)获取执行器代理类

public static ExecutorBiz getExecutorBiz(String address) throws Exception {
        // set-cache
        executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
                NetEnum.NETTY_HTTP,
                Serializer.SerializeEnum.HESSIAN.getSerializer(),
                CallType.SYNC,
                LoadBalance.ROUND,
                ExecutorBiz.class,
                null,
                3000,
                address,
                XxlJobAdminConfig.getAdminConfig().getAccessToken(),
                null,
                null).getObject();
        return executorBiz;
    }

同样我们关注一下getObject()方法

public Object getObject() {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{this.iface}, new InvocationHandler() {
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
              //获取执行器实现类的类名
                String className = method.getDeclaringClass().getName();
                String varsion_ = XxlRpcReferenceBean.this.version;
                //获取方法名
                String methodName = method.getName();
                Class<?>[] parameterTypes = method.getParameterTypes();
            ..........................................................
          
                    if (finalAddress != null && finalAddress.trim().length() != 0) {
                         // request封装
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                        xxlRpcRequest.setAccessToken(XxlRpcReferenceBean.this.accessToken);
                        xxlRpcRequest.setClassName(className);
                        xxlRpcRequest.setMethodName(methodName);
                        xxlRpcRequest.setParameterTypes(parameterTypes);
                        xxlRpcRequest.setParameters(parameters);
                        XxlRpcFutureResponse futureResponse;
                        if (CallType.SYNC == XxlRpcReferenceBean.this.callType) {
                            futureResponse = new XxlRpcFutureResponse(XxlRpcReferenceBean.this.invokerFactory, xxlRpcRequest, (XxlRpcInvokeCallback)null);

                            Object var31;
                            try {
                            // 通过nettyClient.send发送请求执行的方法名和参数
                                XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
                                XxlRpcResponse xxlRpcResponse = futureResponse.get(XxlRpcReferenceBean.this.timeout, TimeUnit.MILLISECONDS);
                                if (xxlRpcResponse.getErrorMsg() != null) {
                                    throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                                }
                               //执行器动态代理ExecutorBiz
                                var31 = xxlRpcResponse.getResult();
                            } catch (Exception var21) {
                                XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
                                throw (Throwable)(var21 instanceof XxlRpcException ? var21 : new XxlRpcException(var21));
                            } finally {
                                futureResponse.removeInvokerFuture();
                            }
                            //var31就是执行器 动态代理ExecutorBiz
                            return var31;
                        }

到了这里我们就生成了执行器的动态代理ExecutorBiz,那么反过来在研究一下executorBiz.run(triggerParam),这段代码的执行实际上是通过InvocationHandler对象的invoke()方法去执行的,在invoke方法中通过nettyClient向执行器发送了需要执行的方法名和参数,那么执行器通过RPC收到请求后就开始执行任务。

相关文章