Java Callable和Future接口使用介绍

x33g5p2x  于2021-08-22 转载在 Java  
字(8.5k)|赞(0)|评价(0)|浏览(324)

在这篇文章中,我们将通过实例学习重要的两个并发接口CallableFuture 

Executor框架的优点之一是你可以运行返回结果的并发任务。Java并发API通过以下两个接口实现了这一点。

  1. Callable: 这个接口有call()方法。在这个方法中,你必须实现一个任务的逻辑。Callable接口是一个参数化的接口,意味着你必须指出call()方法将返回的数据类型。
  2. Future: 这个接口有一些方法来获取Callable对象产生的结果并管理其状态。

我们将学到什么?

  1. Callable接口
    2. Future接口

3. 使用ExecutorService执行Callable任务,并使用Future获得结果
4. Future - 检查任务的完成情况

  1. Future--取消一个Future

  2. Future - 添加超时

  3. Future - invokeAll

  4. Future - invokeAny

1. Callable接口

Callable是一个通用接口,它是这样定义的。

@FunctionalInterface
public interface Callable<V> {
    //*/*
/* Computes a result, or throws an exception if unable to do so.
/*
/* @return computed result
/* @throws Exception if unable to compute a result
/*/
    V call() throws Exception;
}

这里,V表示任务所返回的数据类型。Callable只定义了一个方法,call( ),在此显示。

V call( ) throws Exception

call( )中,你定义了你要执行的任务。在该任务完成后,你返回结果。如果不能计算出结果,call( )必须抛出一个异常。一个Callable任务由一个ExecutorService执行,通过调用其submit( )方法。submit( )有三种形式,但只有一种是用来执行Callable的。它显示在这里。

<T> Future<T> submit(Callable<T> task)

这里,任务是Callable对象,将在它自己的线程中执行。结果通过一个Future类型的对象返回。
例子。

Callable<String> callable = new Callable<String>() {
 
    @Override
    public String call() throws Exception {
       Thread.sleep(2000);
       return "Return some result";
    }
};

注意,Callable接口只有一个方法,所以它是一个函数式接口,我们可以对它应用lambda表达式,如。

Callable<String> callable  = () -> {
       Thread.sleep(2000);
       return "Return some result";
};

2. Future接口

Future是一个通用接口,它表示将由Callable对象返回的值。因为这个值是在Future的某个时间获得的,所以用Future这个名字很合适。
Future的定义是这样的。

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

这里,V指定了结果的类型。为了获得返回的值,你将调用Future的get( )方法,它有这两种形式。

V get( ) throws InterruptedException, ExecutionException
V get(long wait, TimeUnit tu) throws InterruptedException, ExecutionException, TimeoutException

第一种形式是无限期地等待结果。第二种形式允许你在等待中指定一个超时周期。

3. 使用ExecutorService执行Callable任务并使用Future获取结果

在Java中使用Callable和Future来实现返回值的线程更好。使用Executors框架来运行一个Callable任务。

下面是一个Future和Callable的简单例子。在这个例子中,我们使用一个Callable接口创建了五个任务,每个任务的工作是为每个任务给出的数字求和,结果存储在Future 接口中。
让我们首先创建任务--SumNumbers

class SumNumbers implements Callable<Integer> {
     private int n;
 
     public SumNumbers(int n) {
          this.n = n;
     }
 
     public Integer call() {
          int sum = 0;
          for (int i = 1; i <= n; i++) {
   
              sum += i;
              try {
                   Thread.sleep(200);
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
          }
          System.out.println("[" + Thread.currentThread().getName() + "] of sum " + sum);
          return sum;
     }
}

让我们使用ExecutorService.submit()方法创建并提交任务,立即返回并给你一个Future。一旦你获得了一个Future,你就可以在你提交的任务执行的同时并行地执行其他任务,然后使用future.get()方法来检索Future的结果。

public class ReturnValuesUsingCallable {
 
    public static void main(String[] args) throws InterruptedException, ExecutionException {
  
    System.out.println("Thread main started");
  
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Future<Integer> returnedValues = executorService.submit(new SumNumbers(10));
    System.out.println("Result of Future object:: " + returnedValues.get());
    executorService.shutdown();
  
    System.out.println("Thread main finished");
  }
}

输出:

Thread main started
[pool-1-thread-1] of sum 55
Result of Future object:: 55
Thread main finished

让我们用Java 8的Lambda表达式来简化上述例子。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ReturnValuesUsingCallable {
 
    public static void main(String[] args) throws InterruptedException, ExecutionException {
  
        System.out.println("Thread main started");
  
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> returnedValues = executorService.submit(() -> {
             int sum = 0;
             for (int i = 1; i <= 5; i++) {
    
                 sum += i;
                 try {
                      Thread.sleep(200);
                 } catch (InterruptedException e) {
                      e.printStackTrace();
                 }
              }
            System.out.println("[" + Thread.currentThread().getName() + "] of sum " + sum);
            return sum;
       });
  
       System.out.println("Result of Future object:: " + returnedValues.get());
       executorService.shutdown();
  
       System.out.println("Thread main finished");
   }
}

输出。

Thread main started
[pool-1-thread-1] of sum 15
Result of Future object:: 15
Thread main finished

请注意,get()方法会阻塞,直到任务完成。Future API还提供了一个isDone()方法来检查任务是否已经完成。

4. 检查任务的完成情况

如果这个任务完成,返回true。完成可能是由于正常终止、异常或取消 -- 在所有这些情况下,该方法将返回true。

public class ReturnValuesUsingCallable {
 
     public static void main(String[] args) throws InterruptedException, ExecutionException {
  
         System.out.println("Thread main started");
  
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         Future<Integer> returnedValues = executorService.submit(() -> {
             int sum = 0;
             for (int i = 1; i <= 5; i++) {
                  sum += i;
                  try {
                        Thread.sleep(200);
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
             }
           System.out.println("[" + Thread.currentThread().getName() + "] of sum " + sum);
           return sum;
       });
  
       while(!returnedValues.isDone()) {
             System.out.println("Task is still not done...");
             Thread.sleep(200);
       }
   
       System.out.println("Result of Future object:: " + returnedValues.get());
       executorService.shutdown();
  
       System.out.println("Thread main finished");
    }
}

输出:

Thread main started
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
[pool-1-thread-1] of sum 15
Result of Future object:: 15
Thread main finished

5. 取消一个Future函数

你可以使用Future.cancel()方法取消一个Future。它试图取消任务的执行,如果成功取消则返回真,否则返回假。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

<<$14$>>

输出:

Thread main started
Task is still not done...
[pool-1-thread-1] of sum 15
Exception in thread "main" java.util.concurrent.CancellationException
 at java.util.concurrent.FutureTask.report(FutureTask.java:121)
 at java.util.concurrent.FutureTask.get(FutureTask.java:192)
 at com.javaguides.javamultithreading.concurrency.ReturnValuesUsingCallable
        .main(ReturnValuesUsingCallable.java:32)

6. 添加超时

future.get()方法阻塞并等待任务的完成。如果你在Callable的任务中从远程服务中调用API,而远程服务已经停机,那么future.get()将永远阻塞,这将使应用程序无法响应。

为了防范这一事实,你可以在get()方法中添加一个超时器--

future.get(1, TimeUnit.SECONDS);

future.get()方法将抛出一个TimeoutException,如果任务没有在指定时间内完成。

7. invokeAll

提交多个任务并等待所有的任务完成。

你可以通过向invokeAll()方法传递一个Callable集合来执行多个任务。invokeAll()返回一个Futures的列表。任何对future.get()的调用都会阻塞,直到所有的Futures完成。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ReturnValuesUsingCallable {
 
    public static void main(String[] args) throws InterruptedException, ExecutionException {
  
        System.out.println("Thread main started");
  
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<Future<Integer>> returnedValues = executorService.invokeAll(Arrays.asList(
            new SumNumbers(50), 
            new SumNumbers(40),
            new SumNumbers(30),
            new SumNumbers(20),
            new SumNumbers(10)));
  
           for (Future<Integer> value : returnedValues) {
                System.out.println(value.get());
           }
  
           executorService.shutdown();
  
           System.out.println("Thread main finished");
     }
}

class SumNumbers implements Callable<Integer> {
      private int n;
 
      public SumNumbers(int n) {
           this.n = n;
      }
 
      public Integer call() {
         int sum = 0;
         for (int i = 1; i <= n; i++) {
              sum += i;
              try {
                  Thread.sleep(200);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
         }
         System.out.println("[" + Thread.currentThread().getName() + "] Sum " + sum);
         return sum;
      }
}

输出。

Thread main started
[pool-1-thread-5] Sum 55
[pool-1-thread-4] Sum 210
[pool-1-thread-3] Sum 465
[pool-1-thread-2] Sum 820
[pool-1-thread-1] Sum 1275
1275
820
465
210
55
Thread main finished

8. invokeAny

执行给定的任务,如果有任务成功完成(即没有抛出异常),则返回其中的结果。在正常或异常返回时,未完成的任务被取消。如果在这个操作进行中,给定的集合被修改,那么这个方法的结果将无法定义。

import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ReturnValuesUsingCallable {
 
     public static void main(String[] args) throws InterruptedException, ExecutionException {
  
          System.out.println("Thread main started");
  
          ExecutorService executorService = Executors.newFixedThreadPool(5);
          Integer returnedValues = executorService.invokeAny(Arrays.asList(
          new SumNumbers(50), 
          new SumNumbers(40),
          new SumNumbers(30),
          new SumNumbers(20),
          new SumNumbers(10)));
  
          System.out.println(returnedValues);
  
          executorService.shutdown();
  
          System.out.println("Thread main finished");
      }
}

class SumNumbers implements Callable<Integer> {
     private int n;
 
     public SumNumbers(int n) {
         this.n = n;
     }
 
     public Integer call() {
         int sum = 0;
         for (int i = 1; i <= n; i++) {
              sum += i;
         }
         System.out.println("[" + Thread.currentThread().getName() + "] Sum " + sum);
         return sum;
     }
}

输出:

Thread main started
[pool-1-thread-1] Sum 1275
1275
Thread main finished

在Java多线程教程中学习多线程在Java并发教程中学习高级Java并发框架

相关文章