xxl-job执行器端的执行器自动注册原理

x33g5p2x  于2021-12-21 转载在 其他  
字(6.1k)|赞(0)|评价(0)|浏览(894)
前言

在v2.1.0 Release版本中已经移除了quartz依赖,降低了依赖,提高了系统的可控性和稳定性。
此版本才用的通讯方案是"NETTY_HTTP"方案,执行器内嵌netty-http-server提供服务,调度中心复用容器端口提供服务。

执行器注册流程
1、打开执行器管理,我们添加一条执行器

AppName: 是每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用;(AppName在执行器的属性文件配置了 xxl.job.executor.appname=xxl-job-executor-sample
名称: 执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性;
排序: 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表;
注册方式:调度中心获取执行器地址的方式;
       自动注册:执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址;
       手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用;
机器地址:"注册方式"为"手动录入"时有效,支持人工维护执行器的地址信息;

2、注册流程

1)先看看执行器中XxlJobConfig

2)创建XxlJobSpringExecutor 对象并且初始化执行start方法

@Configuration
public class XxlJobConfig {
  
   ..............................................
   //创建XxlJobSpringExecutor 对象,并且初始化执行XxlJobSpringExecutor 的start方法
    @Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppName(appName);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }

3)执行XxlJobSpringExecutor的start()方法

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {
    @Override
    public void start() throws Exception {
        //初始化JobHandler 仓库  注册JobHandler 
        // init JobHandler Repository
        initJobHandlerRepository(applicationContext);
        // refresh GlueFactory
        GlueFactory.refreshInstance(1);
        //接着执行XxlJobExecutor的start()方法
        // super start
        super.start();
    }
    }

然后执行XxlJobExecutor 的start()方法

public class XxlJobExecutor  {
//执行start()方法
 public void start() throws Exception {

        //初始化admin调度中心代理类
        //轮询调度中心地址,多个地址用逗号隔开,每当遇到一个地址就生成动态代理类dminBiz
        //然后把所有的动态代理类都存入adminBizList中
        initAdminBizList(adminAddresses, accessToken);
        
        //初始化xxlRpc(通过netty实现) ,完成注册
        initRpcProvider(ip, port, appName, accessToken);
    }

}

我们进入initAdminBizList(),我们看到有个getObject()方法,此方法生成代理类。

invoke方法并不会立即执行

public Object getObject() {
       //获取调度中心代理类dminBiz
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{this.iface}, new InvocationHandler() {
           
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                String className = method.getDeclaringClass().getName();
                String varsion_ = XxlRpcReferenceBean.this.version;
                String methodName = method.getName();
                Class<?>[] parameterTypes = method.getParameterTypes();
                Object[] parameters = args;
               。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

                 
                    if (finalAddress != null && finalAddress.trim().length() != 0) {
                       //封装请求  调度中心类名 方法名 以及参数
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                        xxlRpcRequest.setAccessToken(XxlRpcReferenceBean.this.accessToken);
                        xxlRpcRequest.setClassName(className);
                        xxlRpcRequest.setMethodName(methodName);
                        xxlRpcRequest.setParameterTypes(parameterTypes);
                        xxlRpcRequest.setParameters(parameters);
                        XxlRpcFutureResponse futureResponse;
                        if (CallType.SYNC == XxlRpcReferenceBean.this.callType) {
                            futureResponse = new XxlRpcFutureResponse(XxlRpcReferenceBean.this.invokerFactory, xxlRpcRequest, (XxlRpcInvokeCallback)null);

                            Object var31;
                            try {
                              //通过netty向调度中心请求执行注册
                                XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
                                XxlRpcResponse xxlRpcResponse = futureResponse.get(XxlRpcReferenceBean.this.timeout, TimeUnit.MILLISECONDS);
                                if (xxlRpcResponse.getErrorMsg() != null) {
                                    throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                                }
                                 //var31=dminBiz
                                var31 = xxlRpcResponse.getResult();
                            } catch (Exception var21) {
                                XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
                                throw (Throwable)(var21 instanceof XxlRpcException ? var21 : new XxlRpcException(var21));
                            } finally {
                                futureResponse.removeInvokerFuture();
                            }

                            return var31;
                        }

我们再来看看initRpcProvider()

private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
        // init, provider factory
        String address = IpUtil.getIpPort(ip, port);
        Map<String, String> serviceRegistryParam = new HashMap<String, String>();
        serviceRegistryParam.put("appName", appName);
        serviceRegistryParam.put("address", address);

        xxlRpcProviderFactory = new XxlRpcProviderFactory();
        //初始化了一个XxlRpcInvokerFactory和XxlRpcProviderFactory
        //指定了使用netty服务,序列化工具,IP,端口,
        //并指定serviceRegistryClass为ExecutorServiceRegistry.class,执行器的自动注册就是在这请求的
        xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);

        // add services 增加执行器服务到PRC,用ExecutorBizImpl作为服务的处理类,调用中心可以调用执行器处理任务
        xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

        // start 
        xxlRpcProviderFactory.start();

    }

xxlRpcProviderFactory.start()执行了ExecutorServiceRegistry初始化

public void start() throws Exception {
        this.serviceAddress = IpUtil.getIpPort(this.ip, this.port);
        this.server = (Server)this.netType.serverClass.newInstance();
        this.server.setStartedCallback(new BaseCallback() {
            public void run() throws Exception {
                if (XxlRpcProviderFactory.this.serviceRegistryClass != null) {
                //实例化ExecutorServiceRegistry
                    XxlRpcProviderFactory.this.serviceRegistry = (ServiceRegistry)XxlRpcProviderFactory.this.serviceRegistryClass.newInstance();
                    //执行ExecutorServiceRegistry的start()方法,此方法中实例化ExecutorRegistryThread线程并且启动线程执行注册
                    XxlRpcProviderFactory.this.serviceRegistry.start(XxlRpcProviderFactory.this.serviceRegistryParam);
                    if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
                        XxlRpcProviderFactory.this.serviceRegistry.registry(XxlRpcProviderFactory.this.serviceData.keySet(), XxlRpcProviderFactory.this.serviceAddress);
                    }
                }

            }
        });

我们再次跟进ExecutorServiceRegistry类

继续跟进start方法,在这个线程里面有个while循环,之前在getObject()方法中获取的adminBiz动态代理类的adminBizList集合被遍历。registryResult = adminBiz.registry(registryParam)触发了动态代理去注册,实际上通过InvocationHandler的invoke()反射执行registry()方法实现的。

invoke()完成向调度中心发送请求进行服务注册操作
通过反射获取xxl-rpc的代理,然后通过调度中心的地址向调度中心请求注册

调度中心处理执行器的请求

相关文章