oozie自定义异步操作

oiopk7p5  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(309)

在oozie中实现自定义异步操作时遇到问题。我的类从actionexecutor扩展而来,覆盖了initactiontype、start、end、check、kill和iscompleted方法。
在start方法中,我想启动一个yarn作业,它是通过我的biohadoopclient类实现的。为了使调用异步,我将client.run()方法 Package 为可调用的:

public void start(final Context context, final WorkflowAction action) {
...
  Callable<String> biohadoop = new Callable<String>() {
    BiohadoopClient client = new BiohadoopClient();
    client.run();
  }

  // submit callable to executor
  executor.submit(biohadoop);

  // set the start data, according to https://oozie.apache.org/docs/4.0.1/DG_CustomActionExecutor.html
  context.setStartData(externalId, callBackUrl, callBackUrl);
...
}

这工作得很好,例如,当我以fork/join方式使用自定义操作时,操作的执行是并行的。
现在,问题是,oozie对于这些操作仍然处于运行状态。似乎不可能把它变成一个完整的状态。oozie从不调用check()方法,end()方法也是如此。在callable中手动设置context.setexternalstatus()、context.setexecutiondata()和context.setenddata()没有帮助(在client.run()完成之后)。我还尝试手动将actionendxcommand排队,但没有成功。
当我在start()方法中等待可调用的完成时,状态得到正确更新,但是fork/join中的执行不再是并行的(这似乎是逻辑上的,因为执行等待可调用的完成)。
外部客户机如何用http回调通知oozie工作流没有帮助,因为使用回调似乎没有改变什么(好吧,我可以看到它发生在日志文件中,但除此之外,什么都没有……)。另外,答案提到,ssh操作是异步运行的,但我还没有发现这是如何做到的。在可调用函数中有一些 Package ,但是在最后,可调用函数的call()方法被直接调用(不提交给执行器)。
到目前为止,我还没有找到任何编写异步自定义操作的示例。有人能帮我吗?
谢谢
编辑
以下是initactiontype()、start()、check()、end()的实现,可调用的实现可以在start()操作中找到。
可调用对象在start()操作中提交给一个执行器,然后调用它的shutdown()方法-因此执行器在可调用对象完成后关闭。下一步,调用context.setstartdata(externalid、callbackurl、callbackurl)。

private final AtomicBoolean finished = new AtomicBoolean(false);

public void initActionType() {
    super.initActionType();
    log.info("initActionType() invoked");
}

public void start(final Context context, final WorkflowAction action)
        throws ActionExecutorException {
    log.info("start() invoked");

    // Get parameters from Node configuration
    final String parameter = getParameters(action.getConf());

    Callable<String> biohadoop = new Callable<String>() {
        @Override
        public String call() throws Exception {
            log.info("Starting Biohadoop");

            // No difference if check() is called manually
            // or if the next line is commented out
            check(context, action);

            BiohadoopClient client = new BiohadoopClient();
            client.run(parameter);
            log.info("Biohadoop finished");             

            finished.set(true);
            // No difference if check() is called manually
            // or if the next line is commented out
            check(context, action);

            return null;
        }
    };

    ExecutorService executor = Executors.newCachedThreadPool();
    biohadoopResult = executor.submit(biohadoop);
    executor.shutdown();

    String externalId = action.getId();
    String callBackUrl = context.getCallbackUrl("finished");
    context.setStartData(externalId, callBackUrl, callBackUrl);
}

public void check(final Context context, final WorkflowAction action)
        throws ActionExecutorException {
    // finished is an AtomicBoolean, that is set to true,
    // after Biohadoop has finished (see implementation of Callable)
    if (finished.get()) {
        log.info("check(Context, WorkflowAction) invoked - 
            Callable has finished");
        context.setExternalStatus(Status.OK.toString());
        context.setExecutionData(Status.OK.toString(), null);
    } else {
        log.info("check(Context, WorkflowAction) invoked");
        context.setExternalStatus(Status.RUNNING.toString());
    }
}

public void end(Context context, WorkflowAction action)
        throws ActionExecutorException {
    log.info("end(Context, WorkflowAction) invoked");
    context.setEndData(Status.OK, Status.OK.toString());
}
ha5z0ras

ha5z0ras1#

有一件事-我可以看到你在提交工作后马上关闭了遗嘱执行人- executor.shutdown(); . 这可能是问题的根源。你能试着把这份声明移到 end() 方法代替?

4ngedf3f

4ngedf3f2#

最后我没有找到解决这个问题的“真正”办法。对我有效的解决方案是实现一个action,使用javaexecutor框架并行调用biohadoop示例。调用之后,我等待线程完成(仍在操作内部)

相关问题