Java Callable 和Future 教程

x33g5p2x  于2021-10-16 转载在 Java  
字(6.7k)|赞(0)|评价(0)|浏览(205)

欢迎阅读我的 Java 并发教程系列的第四部分。在之前的教程中,我们学习了并发、线程、可运行和执行程序服务的基础知识。在本教程中,我们将了解 Callable 和 Future。

Callable

在之前的教程中,我们使用 Runnable 对象来定义在线程内执行的任务。虽然使用 Runnable 定义任务非常方便,但受限于任务无法返回结果。

如果您想从任务中返回结果怎么办?

好吧,Java 提供了一个 Callable 接口来定义返回结果的任务。 CallableRunnable 类似,不同之处在于它可以返回结果并抛出已检查的异常。

Callable 接口有一个方法 call() ,它意味着包含由线程执行的代码。这是一个简单的 Callable 示例 -

Callable<String> callable = new Callable<String>() {
    @Override
    public String call() throws Exception {
        // Perform some computation
        Thread.sleep(2000);
        return "Return some result";
    }
};

请注意,对于 Callable,您不需要用 try/catch 块包围 Thread.sleep(),因为与 Runnable 不同,Callable 可以抛出已检查的异常。

您还可以像这样在 Callable 中使用 lambda 表达式 -

Callable<String> callable = () -> {
    // Perform some computation
    Thread.sleep(2000);
    return "Return some result";
};

###使用ExecutorService执行Callable任务并使用Future获取结果

就像 Runnable 一样,您可以将 Callable 提交给执行程序服务以执行。但是 Callable 的结果呢?你如何访问它?

执行器服务的 submit() 方法将任务提交给线程执行。但是,它不知道提交的任务的结果何时可用。因此,它返回一种称为 Future 的特殊类型的值,可用于在任务可用时获取结果。

Future 的概念类似于 Javascript 等其他语言中的 Promise。它表示将在以后的某个时间点完成的计算结果。

以下是 Future 和 Callable 的简单示例 -

import java.util.concurrent.*;

public class FutureAndCallableExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> callable = () -> {
            // Perform some computation
            System.out.println("Entered Callable");
            Thread.sleep(2000);
            return "Hello from Callable";
        };

        System.out.println("Submitting Callable");
        Future<String> future = executorService.submit(callable);

        // This line executes immediately
        System.out.println("Do something else while callable is getting executed");

        System.out.println("Retrieve the result of the future");
        // Future.get() blocks until the result is available
        String result = future.get();
        System.out.println(result);

        executorService.shutdown();
    }

}
# Output
Submitting Callable
Do something else while callable is getting executed
Retrieve the result of the future
Entered Callable
Hello from Callable

ExecutorService.submit() 方法立即返回并给你一个 Future。一旦获得了future,就可以在提交的任务执行的同时并行执行其他任务,然后使用future.get()方法检索future的结果。

请注意, get() 方法会阻塞,直到任务完成。 Future API 还提供了一个 isDone() 方法来检查任务是否完成——

import java.util.concurrent.*;

public class FutureIsDoneExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Future<String> future = executorService.submit(() -> {
            Thread.sleep(2000);
            return "Hello from Callable";
        });

        while(!future.isDone()) {
            System.out.println("Task is still not done...");
            Thread.sleep(200);
        }

        System.out.println("Task completed! Retrieving the result");
        String result = future.get();
        System.out.println(result);

        executorService.shutdown();
    }
}
# Output
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task completed! Retrieving the result
Hello from Callable

Cancelling a Future

您可以使用 Future.cancel() 方法Cancelling Future。它尝试取消任务的执行,如果取消成功则返回真,否则返回假。

cancel() 方法接受一个布尔参数 - mayInterruptIfRunning。如果为此参数传递值 true,则当前正在执行任务的线程将被中断,否则将允许完成正在进行的任务。

您可以使用 isCancelled() 方法来检查任务是否被取消。此外,取消任务后,isDone() 将始终为真。

import java.util.concurrent.*;

public class FutureCancelExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        long startTime = System.nanoTime();
        Future<String> future = executorService.submit(() -> {
            Thread.sleep(2000);
            return "Hello from Callable";
        });

        while(!future.isDone()) {
            System.out.println("Task is still not done...");
            Thread.sleep(200);
            double elapsedTimeInSec = (System.nanoTime() - startTime)/1000000000.0;

            if(elapsedTimeInSec > 1) {
                future.cancel(true);
            }
        }

        System.out.println("Task completed! Retrieving the result");
        String result = future.get();
        System.out.println(result);

        executorService.shutdown();
    }
}
# Output
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task completed! Retrieving the result
Exception in thread "main" java.util.concurrent.CancellationException
        at java.util.concurrent.FutureTask.report(FutureTask.java:121)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at FutureCancelExample.main(FutureCancelExample.java:34)

如果你运行上面的程序,它会抛出一个异常,因为如果任务被取消,future.get() 方法会抛出CancellationException。我们可以通过在检索结果之前检查未来是否被取消来处理这个事实 -

if(!future.isCancelled()) {
    System.out.println("Task completed! Retrieving the result");
    String result = future.get();
    System.out.println(result);
} else {
    System.out.println("Task was cancelled");
}

添加超时

future.get() 方法阻塞并等待任务完成。如果在可调用任务中从远程服务调用 API 并且远程服务关闭,则 future.get() 将永远阻塞,这将使应用程序无响应。

为了防止这个事实,你可以在 get() 方法中添加一个超时 -

future.get(1, TimeUnit.SECONDS);

如果任务没有在指定时间内完成,future.get() 方法将抛出一个 TimeoutException

invokeAll

提交多个任务并等待所有任务完成。

您可以通过将 Callables 集合传递给 invokeAll() 方法来执行多个任务。 invokeAll() 返回一个期货列表。任何对 future.get() 的调用都将阻塞,直到所有 Futures 完成。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class InvokeAllExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        Callable<String> task1 = () -> {
            Thread.sleep(2000);
            return "Result of Task1";
        };

        Callable<String> task2 = () -> {
            Thread.sleep(1000);
            return "Result of Task2";
        };

        Callable<String> task3 = () -> {
            Thread.sleep(5000);
            return "Result of Task3";
        };

        List<Callable<String>> taskList = Arrays.asList(task1, task2, task3);

        List<Future<String>> futures = executorService.invokeAll(taskList);

        for(Future<String> future: futures) {
            // The result is printed only after all the futures are complete. (i.e. after 5 seconds)
            System.out.println(future.get());
        }

        executorService.shutdown();
    }
}
# Output
Result of Task1
Result of Task2
Result of Task3

在上面的程序中,第一次调用 future.get() 语句会阻塞,直到所有的期货都完成。即结果将在 5 秒后打印。

invokeAny

提交多个任务并等待其中任何一个完成

invokeAny() 方法接受 Callables 的集合,并返回最快的 Callable 的结果。请注意,它不会返回 Future。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class InvokeAnyExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        Callable<String> task1 = () -> {
            Thread.sleep(2000);
            return "Result of Task1";
        };

        Callable<String> task2 = () -> {
            Thread.sleep(1000);
            return "Result of Task2";
        };

        Callable<String> task3 = () -> {
            Thread.sleep(5000);
            return "Result of Task3";
        };

        // Returns the result of the fastest callable. (task2 in this case)
        String result = executorService.invokeAny(Arrays.asList(task1, task2, task3));

        System.out.println(result);

        executorService.shutdown();
    }
}
# Output
Result of Task2

结论

您可以在 my github repository 中找到本教程中使用的所有代码片段。我鼓励你 fork 回购并自己练习程序。

相关文章

微信公众号

最新文章

更多