xxl-job调度器端的执行器自动注册原理

x33g5p2x  于2021-12-21 转载在 其他  
字(3.4k)|赞(0)|评价(0)|浏览(806)
调度器端的执行器注册流程

1、执行器发出注册请求,是调度器的JobApiController的api()方法去响应的

2、我们反过来看看调度中心的配置方法类XxlJobScheduler

@Override
    public void afterPropertiesSet() throws Exception {

        // 启动自动注册线程, 获取类型为自动注册的执行器信息,完成机器的自动注册与发现
        JobRegistryMonitorHelper.getInstance().start();

        //  启动失败日志监控线程
        JobFailMonitorHelper.getInstance().start();

        // admin的服务启动,让执行器可以调用调度中心的接口
        initRpcProvider();

        //开启 job scheudle 定时触发
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }

追踪initRpcProvider()方法

private void initRpcProvider(){
        // init
        XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();
        xxlRpcProviderFactory.initConfig(
                NetEnum.NETTY_HTTP,
                Serializer.SerializeEnum.HESSIAN.getSerializer(),
                null,
                0,
                XxlJobAdminConfig.getAdminConfig().getAccessToken(),
                null,
                null);

        // 初始化xxlRpcProviderFactory
        xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());

        // 初始化servletServerHandler
        servletServerHandler = new ServletServerHandler(xxlRpcProviderFactory);
    }

3、初始化servletServerHandler之后,当调度器的JobApiController的api()方法去响应时候,servletServerHandler.handle(null, request, response)开始执行。

public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
    servletServerHandler.handle(null, request, response);
}

 public void handle(String target, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
        StringBuffer stringBuffer;
        if (!"/services".equals(target)) {
            stringBuffer = null;

            XxlRpcRequest xxlRpcRequest;
            try {
            //系列化请求 生成rpcXxlRpcRequest
                xxlRpcRequest = this.parseRequest(request);
            } catch (Exception var7) {
                this.writeResponse(response, ThrowableUtil.toString(var7).getBytes());
                return;
            }
			//通过反射,调用方法
            XxlRpcResponse xxlRpcResponse = this.xxlRpcProviderFactory.invokeService(xxlRpcRequest);
            byte[] responseBytes = this.xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);
            this.writeResponse(response, responseBytes);
        }

4、最后我们在看一下invokeService()的具体代码

public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
        XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
        xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
        String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
        //serviceBean就是AdminBizImpl类
        Object serviceBean = this.serviceData.get(serviceKey);
        .....................................................................
            try {
                Class<?> serviceClass = serviceBean.getClass();
                //methodName就是registry方法
                String methodName = xxlRpcRequest.getMethodName();
                Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
                Object[] parameters = xxlRpcRequest.getParameters();
                //反射 通过方法名获取方法
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                method.setAccessible(true);
                //通过反射执行registry方法,完成注册,并返回注册结果
                Object result = method.invoke(serviceBean, parameters);
                xxlRpcResponse.setResult(result);
            } catch (Throwable var11) {
                logger.error("xxl-rpc provider invokeService error.", var11);
                xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(var11));
            }

            return xxlRpcResponse;
    }

我们再看看registry方法,就是向xxl_job_registry表中更新或者插入执行器信息

@Override
public ReturnT<String> registry(RegistryParam registryParam) {
    int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
    if (ret < 1) {
        xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
    }
    return ReturnT.SUCCESS;
}

相关文章