快速开始

快速开始

实例化ExecutorService

实例化ExecutorService的方式有两种:一种是工厂方法,另一种是直接创建。

Executors.newFixedThreadPool()工厂方法创建ExecutorService实例

创建ExecutorService实例的最简单方法是使用Executors类的提供的工厂方法。比如:

ExecutorService executor = Executors.newFixedThreadPool(10);

当然还有其它很多工厂方法,每种工厂方法都可以创建满足特定用例的预定义ExecutorService实例。你所需要做的就是找到自己想要的合适的方法。

直接创建ExecutorService的实例

因为ExecutorService是只是一个接口,因此可以使用其任何实现类的实例。Java java.util.concurrent包已经预定义了几种实现可供我们选择,或者你也可以创建自己的实现。例如,ThreadPoolExecutor类实现了ExecutorService接口并提供了一些构造函数用于配置执行程序服务及其内部池。

ExecutorService executorService =
  new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
  new LinkedBlockingQueue<Runnable>()
);

将任务分配给ExecutorService

ExecutorService可以执行RunnableCallable任务。

Runnable runnableTask = () -> {
    try {
        TimeUnit.MILLISECONDS.sleep(300);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Callable<String> callableTask = () -> {
    TimeUnit.MILLISECONDS.sleep(300);
    return "Task's execution";
};

List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);

创建完了任务之后,就可以使用多种方法将任务分配给ExecutorService ,比如execute()方法,还有submit()invokeAny()invokeAll()等方法。这些方法都继承自Executor接口。

  • 首先来看看execute()方法。

该方法返回值为空( void )。因此使用该方法没有任何可能获得任务执行结果或检查任务的状态( 是正在运行( running )还是执行完毕( executed )

executorService.execute(runnableTask);
  • 其次看看submit()方法。

submit()方法会将一个CallableRunnable任务提交给ExecutorService并返回Future类型的结果。

Future<String> future = executorService.submit(callableTask);
  • 然后是invokeAny()方法。

invokeAny()方法将一组任务分配给ExecutorService,使每个任务执行,并返回任意一个成功执行的任务的结果(如果成功执行)

String result = executorService.invokeAny(callableTasks);
  • 最后是invokeAll()方法。

invokeAll()方法将一组任务分配给ExecutorService ,使每个任务执行,并以Future类型的对象列表的形式返回所有任务执行的结果。

List<Future<String>> futures = executorService.invokeAll(callableTasks);

关闭ExecutorService

一般情况下,ExecutorService并不会自动关闭,即使所有任务都执行完毕,或者没有要处理的任务,也不会自动销毁ExecutorService 。它会一直出于等待状态,等待我们给它分配新的工作。这种机制,在某些情况下是非常有用的,比如,如果应用程序需要处理不定期出现的任务,或者在编译时不知道这些任务的数量。但另一方面,这也带来了副作用:即使应用程序可能已经到达它的终点,但并不会被停止,因为等待的ExecutorService将导致JVM继续运行。这样,我们就需要主动关闭ExecutorService。要正确的关闭ExecutorService,可以调用实例的shutdown()shutdownNow()方法。

ExecutorService接口继承了Executor接口,定义了一些生命周期的方法:

public interface ExecutorService extends Executor {
  void shutdown();
  List<Runnable> shutdownNow();
  boolean isShutdown();
  boolean isTerminated();
  boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;
}
  • shutdown方法:这个方法会平滑地关闭ExecutorService,当我们调用这个方法时,ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已 经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService

  • awaitTermination方法:这个方法有两个参数,一个是timeout即超 时时间,另一个是unit即时间单位。这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。

  • shutdown()方法:

executorService.shutdown();

shutdown()方法并不会立即销毁ExecutorService实例,而是首先让ExecutorService停止接受新任务,并在所有正在运行的线程完成当前工作后关闭。

  • shutdownNow()方法:
List<Runnable> notExecutedTasks = executorService.shutDownNow();

shutdownNow()方法会尝试立即销毁ExecutorService实例,所以并不能保证所有正在运行的线程将同时停止。该方法会返回等待处理的任务列表,由开发人员自行决定如何处理这些任务。

因为提供了两个方法,因此关闭ExecutorService实例的最佳实战(也是Oracle所推荐的)就是同时使用这两种方法并结合awaitTermination()方法。使用这种方式,ExecutorService首先停止执行新任务,等待指定的时间段完成所有任务。如果该时间到期,则立即停止执行。

executorService.shutdown();
try {
    if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
        executorService.shutdownNow();
    }
} catch (InterruptedException e) {
    executorService.shutdownNow();
}

Future接口

submit()方法和invokeAll()方法返回一个Future接口的对象或Future类型的对象集合。这些Future接口的对象允许我们获取任务执行的结果或检查任务的状态(是正在运行还是执行完毕

Future接口get()方法

Future接口提供了一个特殊的阻塞方法get(),它返回Callable任务执行的实际结果,但如果是Runnable任务,则只会返回null。因为get()方法是阻塞的。如果调用get()方法时任务仍在运行,那么调用将会一直被执阻塞,直到任务正确执行完毕并且结果可用时才返回。

而且更重要的是,正在被执行的任务随时都可能抛出异常或中断执行。因此我们要将get()调用放在try catch语句块中,并捕捉InterruptedExceptionExecutionException异常。

Future<String> future = executorService.submit(callableTask);
String result = null;
try {
    result = future.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

因为 get() 方法是阻塞的,而且并不知道要阻塞多长时间。因此可能导致应用程序的性能降低。如果结果数据并不重要,那么我们可以使用超时机制来避免长时间阻塞。

String result = future.get(200, TimeUnit.MILLISECONDS);

这个 get() 的重载,第一个参数为超时的时间,第二个参数为时间的单位。上面的实例所表示就的就是等待200毫秒。

注意,这个 get() 重载方法,如果在超时时间内正常结束,那么返回的是Future类型的结果,如果超时了还没结束,那么将抛出TimeoutException异常。

除了get()方法之外,Future还提供了其它很多方法,我们将几个重要的方法罗列在此

方法 说明
isDone() 检查已分配的任务是否已处理
cancel() 取消任务执行
isCancelled() 检查任务是否已取消

这些方法的使用方式如下

boolean isDone      = future.isDone();
boolean canceled    = future.cancel(true);
boolean isCancelled = future.isCancelled();
上一页
下一页