ThreadPoolExecutor
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
构造函数参数说明:
corePoolSize => 线程池核心线程数量maximumPoolSize => 线程池最大数量keepAliveTime => 空闲线程存活时间unit => 时间单位workQueue => 线程池所使用的缓冲队列threadFactory => 线程池创建线程使用的工厂handler => 线程池对拒绝任务的处理策略

执行逻辑说明:
- 判断核心线程数是否已满,核心线程数大小和
corePoolSize
参数有关,未满则创建线程执行任务 - 若核心线程池已满,判断队列是否满,队列是否满和
workQueue
参数有关,若未满则加入队列中 - 若队列已满,判断线程池是否已满,线程池是否已满和
maximumPoolSize
参数有关,若未满创建线程执行任务 - 若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和
handler
参数有关
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
线程数目的控制
其中
corePoolSize 为正常情况下线程池中线程的数量。maximumPoolSize 为线程池线程数量的上限,当提交给线程池中得任务数量过多时线程池会临时创建新的工作线程来执行这些任务,但保证线程池中线程的总量不会超过maximumPoolSize 。keepAliveTime 表示当线程池中的线程数量超过了corePoolSize 的时候,如果一个线程的闲置时间超过了keepAliveTime 那么这个线程就有可能被消灭(TimeUnit 为keepAliveTime 的单位) 。
合理的配置这三个参数可以保证线程池“弹性”,当任务请求量大的时候线程池中的线程数量会扩大到
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到
任务队列
线程池基于生产消费者模式,提交任务的活动为生产者,处理任务的工作线程为消费者,实现这种工作模式就需要一个阻塞队列来承载提交执行的任务,工作线程从队列中获取提交的任务来执行。这种生产消费者模式可以保证如果线程池中工作线程的数量有限,那么可以保证并发执行任务的数量,相比于提交一个任务立马分配一个线程的模式,这种方式可以保证系统资源不会因为大量的请求而耗尽。
可以在构造
一个大的任务队列配合一个小的线程池可以有效的提高
-
执行的任务为同一种类。
-
任务与任务之间不存在相互依赖。
对于上述两种情况有界的队列可能会产生饥饿死锁的情况,如果出现了这两种情况则可以使用
饥饿策略
当线程池中的工作线程忙碌,且任务队列充满的时候,如果在有新的任务提交到线程池就会触发饥饿策略。与任务队列相似,
AbortPolicy
当触发饥饿策略时候,
// 线程池默认使用的是abortPolicy, 现在将任务持有的Q大小设置为1,说明等待的任务最有只能有一个
// ,如果等待任务超过一个则会触发abortPolicy,abortPolicy会抛出异常
ExecutorService executorService =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1));
executorService.submit(()-> {
SleepUtil.sleepSeconds(10);
System.out.println("task 1");
});
executorService.submit(()-> {
System.out.println("task 2");
});
executorService.submit(()-> {
System.out.println("task 3");
});
DiscardPolicy
CallerRunsPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
可以看出当任务队列充满之后并有什么方法可以阻塞新任务的提交,如果想实现这种策略可以使用
public class BoundedExecutor {
private final Executor executor;
private final Semaphore semaphore;
public BoundedExecutor(Executor executor, Semaphore semaphore) {
this.executor = executor;
this.semaphore = semaphore;
}
public void submitTask(final Runnable command) throws InterruptedException {
// 提交任务之前,先获取semaphore,当semaphore数量为0时,方法阻塞,等待任务执行结束释放,这样可以保证提交任务的数量
semaphore.acquire();
try {
executor.execute(
() -> {
try {
command.run();
} finally {
semaphore.release();
}
}
);
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
ThreadPoolExecutor 扩展
生命周期
public class TimingThreadPool extends ThreadPoolExecutor {
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
private final Logger logger = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
logger.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long end = System.nanoTime();
long taskTime = end - startTime.get();
// 记录任务的执行数量
numTasks.incrementAndGet();
// 记录任务的执行时间
totalTime.addAndGet(taskTime);
logger.fine(
String.format("Thread %s: end %s, time=%dns", t, r, taskTime)
);
} finally {
super.afterExecute(r, t);
}
}
@Override
protected void terminated() {
try {
// 打印每个任务执行的平均时间
logger.info(
String.format(
"Terminated: avg time=%dns",
totalTime.get() / numTasks.get()
)
);
} finally {
super.terminated();
}
}
}
线程工厂
线程池中的工作线程都是通过线程工厂来创建的,同样的
public interface ThreadFactory {
Thread newThread(Runnable r);
}
可以通过实现这个接口来定制线程工厂。