构造可取消java任务的dag

plicqrtu  于 2021-07-03  发布在  Java
关注(0)|答案(1)|浏览(295)

我想在java中创建一个由任务组成的dag,其中的任务可能依赖于其他任务的输出。如果两个任务之间没有定向路径,则它们可以并行运行。任务可能被取消。如果任何任务引发异常,则所有任务都将取消。
我想用 CompleteableFuture 为此,尽管实施了 Future 接口(包括 Future.cancel(boolean) , CompletableFuture 不支持取消-- CompletableFuture.cancel(true) 只是被忽略了(有人知道为什么吗?)
因此,我求助于使用 Future . 这是一大堆陈词滥调,而且很复杂。还有比这更好的方法吗?
举个例子:
我想打电话 Process process = Runtime.getRuntime().exec(cmd) 要启动命令行进程,请创建 Future<Process> . 然后我想启动(扇出到)三个子任务:
一个任务消耗来自 process.getInputStream() 一个任务消耗来自 process.getErrorStream() 一项任务 process.waitFor() ,然后等待结果。
然后我要等待所有三个已启动的子任务完成(即扇入/a完成屏障)。这应该在期末考试中收集 Future<Integer> exitCode 它收集 process.waitFor() 任务。两个输入使用者任务只返回 Void ,因此可以忽略它们的输出,但完成屏障仍应等待它们的完成。
我希望任何已启动的子任务中的失败都会导致取消所有子任务,并销毁底层进程。
请注意 Process process = Runtime.getRuntime().exec(cmd) 在第一步中,可以抛出一个异常,这将导致失败一路级联到 exitCode .

@FunctionalInterface
public static interface ConsumerThrowingIOException<T> {
    public void accept(T val) throws IOException;
}

public static Future<Integer> exec(
        ConsumerThrowingIOException<InputStream> stdoutConsumer,
        ConsumerThrowingIOException<InputStream> stderrConsumer,
        String... cmd) {

    Future<Process> processFuture = executor.submit(
            () -> Runtime.getRuntime().exec(cmd));

    AtomicReference<Future<Void>> stdoutProcessorFuture = //
            new AtomicReference<>();
    AtomicReference<Future<Void>> stderrProcessorFuture = //
            new AtomicReference<>();
    AtomicReference<Future<Integer>> exitCodeFuture = //
            new AtomicReference<>();

    Runnable cancel = () -> {
        try {
            processFuture.get().destroy();
        } catch (Exception e) {
            // Ignore (exitCodeFuture.get() will still detect exceptions)
        }
        if (stdoutProcessorFuture.get() != null) {
            stdoutProcessorFuture.get().cancel(true);
        }
        if (stderrProcessorFuture.get() != null) {
            stderrProcessorFuture.get().cancel(true);
        }
        if (exitCodeFuture.get() != null) {
            stderrProcessorFuture.get().cancel(true);
        }
    };

    if (stdoutConsumer != null) {
        stdoutProcessorFuture.set(executor.submit(() -> {
            try {
                InputStream inputStream = processFuture.get()
                        .getInputStream();
                stdoutConsumer.accept(inputStream != null
                        ? inputStream
                        : new ByteArrayInputStream(new byte[0]));
                return null;
            } catch (Exception e) {
                cancel.run();
                throw e;
            }
        }));
    }

    if (stderrConsumer != null) {
        stderrProcessorFuture.set(executor.submit(() -> {
            try {
                InputStream errorStream = processFuture.get()
                        .getErrorStream();
                stderrConsumer.accept(errorStream != null
                        ? errorStream
                        : new ByteArrayInputStream(new byte[0]));
                return null;
            } catch (Exception e) {
                cancel.run();
                throw e;
            }
        }));
    }

    exitCodeFuture.set(executor.submit(() -> {
        try {
            return processFuture.get().waitFor();
        } catch (Exception e) {
            cancel.run();
            throw e;
        }
    }));

    // Async completion barrier -- wait for process to exit,
    // and for output processors to complete
    return executor.submit(() -> {
        Exception exception = null;
        int exitCode = 1;
        try {
            exitCode = exitCodeFuture.get().get();
        } catch (InterruptedException | CancellationException
                | ExecutionException e) {
            cancel.run();
            exception = e;
        }
        if (stderrProcessorFuture.get() != null) {
            try {
                stderrProcessorFuture.get().get();
            } catch (InterruptedException | CancellationException
                    | ExecutionException e) {
                cancel.run();
                if (exception == null) {
                    exception = e;
                } else if (e instanceof ExecutionException) {
                    exception.addSuppressed(e);
                }
            }
        }
        if (stdoutProcessorFuture.get() != null) {
            try {
                stdoutProcessorFuture.get().get();
            } catch (InterruptedException | CancellationException
                    | ExecutionException e) {
                cancel.run();
                if (exception == null) {
                    exception = e;
                } else if (e instanceof ExecutionException) {
                    exception.addSuppressed(e);
                }
            }
        }
        if (exception != null) {
            throw exception;
        } else {
            return exitCode;
        }
    });
}

注:我知道 Runtime.getRuntime().exec(cmd) 应该是非阻塞的,所以不需要自己的 Future ,但我还是用一个来编写代码,以说明dag构造的意义。

ercv8c1e

ercv8c1e1#

不可能。进程没有异步接口(process.onexit()除外)。因此,您必须使用线程来等待进程的创建,并在从inputstreams读取数据时使用线程。dag的其他组件可以是异步任务(completablefutures)。
这不是什么大问题。异步任务相对于线程的唯一优点是占用的内存更少。不管怎样,你的过程消耗了很多内存,所以在这里保存内存没有什么意义。

相关问题