并发编程-线程池
线程池基础
线程池执行流程
- 提交一个任务,线程池里存活的核心线程数小于线程数corePoolSize时,线程池会创建一个核心线程去处理提交的任务。
- 如果线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行。
- 当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建一个非核心线程执行提交的任务。
- 如果当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理。
线程池状态
线程池的5种状态:Running、ShutDown、Stop、Tidying、Terminated。
状态 | 说明 | 状态切换 |
---|---|---|
RUNNING | 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。创建时会调用此语句:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); | 线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0! |
SHUTDOWN | 线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。 | 调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。 |
STOP | 线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 | 调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。 |
TIDYING | 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。 | 当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。 |
TERMINATED | 线程池彻底终止,就变成TERMINATED状态。 | 线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。 |
线程池核心参数
线程池七大核心参数:
参数名称 | 参数含义 |
---|---|
int corePoolSize | 核心线程数 |
int maximumPoolSize | 最大线程数 |
long keepAliveTime | 线程存活时间 |
TimeUnit unit | 时间单位 |
BlockingQueue workQueue | 阻塞队列 |
ThreadFactory threadFactory | 线程创建工厂 |
RejectedExecutionHandler handler | 拒绝策略 |
corePoolSize 核心线程数
当往线程池中提交任务,会创建线程去处理任务,直到线程数达到corePoolSize,才会往阻塞队列中添加任务。默认情况下,空闲的核心线程并不会被回收,除非配置了allowCoreThreadTimeOut=true。
maximumPoolSize 最大线程数
当线程池中的线程数达到corePoolSize,阻塞队列又满了之后,才会继续创建线程,直到达到maximumPoolSize,另外空闲的非核心线程会被回收。
keepAliveTime 线程存活时间
非核心线程的空闲时间达到了keepAliveTime,将会被回收。
TimeUnit 时间单位
线程存活时间的单位,默认是TimeUnit.MILLISECONDS(毫秒),可选择的有:
TimeUnit.NANOSECONDS(纳秒) TimeUnit.MICROSECONDS(微秒) TimeUnit.MILLISECONDS(毫秒) TimeUnit.SECONDS(秒) TimeUnit.MINUTES(分钟) TimeUnit.HOURS(小时) TimeUnit.DAYS(天)
workQueue 阻塞队列
当线程池中的线程数达到corePoolSize,再提交的任务就会放到阻塞队列的等待,默认使用的是LinkedBlockingQueue,可选择的有:
LinkedBlockingQueue(基于链表实现的阻塞队列)
ArrayBlockingQueue(基于数组实现的阻塞队列)
SynchronousQueue(只有一个元素的阻塞队列)
PriorityBlockingQueue(实现了优先级的阻塞队列)
DelayQueue(实现了延迟功能的阻塞队列)
threadFactory 线程创建工厂
用来创建线程的工厂,默认的是Executors.defaultThreadFactory(),可选择的还有Executors.privilegedThreadFactory()实现了线程优先级。当然也可以自定义线程创建工厂,创建线程的时候最好指定线程名称,便于排查问题。
RejectedExecutionHandler 拒绝策略
当线程池中的线程数达到maximumPoolSize,阻塞队列也满了之后,再往线程池中提交任务,就会触发执行拒绝策略,默认的是AbortPolicy(直接终止,抛出异常),可选择的有:
*拒绝策略* | *说明* |
---|---|
ThreadPoolExecutor.AbortPolicy | 线程池**默认的阻塞策略。**不执行此任务,而且抛出一个运行时异常(未检查的异常RejectedExecutionException)。切记:ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。 |
ThreadPoolExecutor.DiscardPolicy | 不执行此任务,而且不抛异常。(是个空方法) |
ThreadPoolExecutor.DiscardOldestPolicy | 从队列里删除最老的任务(头部的一个任务),并再次execute 此task。 |
ThreadPoolExecutor.CallerRunsPolicy | 让调用execute方法的线程执行此command,会阻塞入口。这是个调节机制,既不抛弃任务也不抛出异常,而是将某些任务回退到调用者,让调用者所在的线程去执行。 |
用户自定义拒绝策略(最常用) | 实现RejectedExecutionHandler,并自己定义策略模式 |
线程池源码
一个线程池包括四个基本部分
- 线程管理池(ThreadPool):用于创建并管理线程池,有创建,销毁,添加新任务;
- 工作线程(PoolWorker):线程池中的线程在没有任务的时候处于等待状态,可以循环的执行任务;
- 任务接口(Task):每个任务必须实现接口,用来提供工作线程调度任务的执行,规定了任务的入口以及执行结束的收尾工作和任务的执行状态等;
- 任务队列:用于存放没有处理的任务,提供一种缓存机制。
- Executor是顶层接口
- 只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型
- 用来执行传进去的任务
- ExecutorService接口继承了Executor接口
- 声明了一些方法:submit、invokeAll、invokeAny以及shutDown等
- 抽象类AbstractExecutorService实现了ExecutorService接口
- 实现了ExecutorService中声明的所有方法
- ThreadPoolExecutor继承了类AbstractExecutorService
类 | 描述 |
---|---|
ExecutorService | 真正的线程池接口 |
ScheduledExecutorService | 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。 |
ThreadPoolExecutor | ExecutorService的默认实现 |
ScheduledThreadPoolExecutor | 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现 |
ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {
// 线程池的控制状态,Integer长度是32位,前3位用来存储线程池状态,后29位用来存储线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程个数所占的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大容量,2^29-1,约5亿个线程
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 独占锁,用来控制多线程下的并发操作
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程的集合
private final HashSet<Worker> workers = new HashSet<>();
// 等待条件,用来响应中断
private final Condition termination = mainLock.newCondition();
// 是否允许回收核心线程
private volatile boolean allowCoreThreadTimeOut;
// 线程数的历史峰值
private int largestPoolSize;
/**
* 以下是线程池的七大核心参数
*/
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile long keepAliveTime;
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
}
线程池的控制状态 ctl 用来存储线程池状态和线程个数,前3位用来存储线程池状态,后29位用来存储线程数量。[一个变量存储了两块内容]
execute源码
线程池的核心逻辑:往线程池提交任务
// 往线程池中提交任务
public void execute(Runnable command) {
// 1. 判断提交的任务是否为null
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 2. 判断线程数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 3. 把任务包装成worker,添加到worker集合中
if (addWorker(command, true))
return;
c = ctl.get();
}
// 4. 判断如果线程数不小于corePoolSize,并且可以添加到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// 5. 重新检查线程池状态,如果线程池不是运行状态,就移除刚才添加的任务,并执行拒绝策略
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
// 6. 判断如果线程数是0,就创建非核心线程(任务是null,会从阻塞队列中拉取任务)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 7. 如果添加阻塞队列失败,就创建一个Worker
else if (!addWorker(command, false))
// 8. 如果创建Worker失败说明已经达到最大线程数了,则执行拒绝策略
reject(command);
}
execute方法最终就是调用addWorker方法,把任务添加到worker集合中,再看一下addWorker方法的源码:
把任务和线程包装成worker,添加到worker集合,并启动线程。
// 添加worker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 检查是否允许提交任务
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
// 2. 使用死循环保证添加线程成功
for (; ; ) {
int wc = workerCountOf(c);
// 3. 校验线程数是否超过容量限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 4. 使用CAS修改线程数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 5. 如果线程池状态变了,则从头再来
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 6. 把任务和新线程包装成一个worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 7. 加锁,控制并发
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 8. 再次校验线程池状态是否异常
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 9. 如果线程已经启动,就抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
// 10. 添加到worker集合中
workers.add(w);
int s = workers.size();
// 11. 记录线程数历史峰值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 12. 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
worker源码
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// 工作线程
final Thread thread;
// 任务
Runnable firstTask;
// 创建worker,并创建一个新线程(用来执行任务)
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
runWorker源码
线程池执行方法 run()
runWorker(Worker w) 方法逻辑也很简单,就是不断从阻塞队列中拉取任务并执行。
getTask() 从阻塞队列中拉取任务
// 线程执行入口
public void run() {
runWorker(this);
}
// 线程运行核心方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 1. 如果当前worker中任务是null,就从阻塞队列中获取任务
while (task != null || (task = getTask()) != null) {
// 加锁,保证thread不被其他线程中断(除非线程池被中断)
w.lock();
// 2. 校验线程池状态,是否需要中断当前线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 3. 执行run方法
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 解锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 4. 从worker集合删除当前worker
processWorkerExit(w, completedAbruptly);
}
}
// 从阻塞队列中拉取任务
private Runnable getTask() {
boolean timedOut = false;
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 如果线程池已经停了,或者阻塞队列是空,就回收当前线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 2. 再次判断是否需要回收线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 3. 从阻塞队列中拉取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程池应用
线程池参数如何设置
根据任务所需要的cpu和io资源的量可以分为:
- CPU密集型任务:
- 主要是执行计算任务。响应时间很快,cpu一直在运行,这种任务cpu的利用率很高。
- 线程池大小太大对程序性能而言,反而是不利的,但最少也不应低于处理器的核心数。因为当有多个任务处于就绪状态时,处理器核心需要在线程间频繁进行上下文切换,而这种切换对程序性能损耗较大。
- IO密集型任务
- 主要是进行IO操作,执行IO操作的时间较长,这时cpu处于空闲状态,导致cpu的利用率不高。
- 当一个任务执行IO操作时,其线程将被阻塞,于是处理器可以立即进行上下文切换以便处理其他就绪线程。如果我们只有处理器可用核心数那么多个线程的话,即使有待执行的任务也无法处理,因为我们已经拿不出更多的线程供处理器调度了。
CPU密集型任务与IO密集型任务区分方法
如果任务被阻塞的时间少于执行时间,即这些任务是计算密集型的,则程序所需线程数将随之减少,但最少也不应低于处理器的核心数。
如果任务被阻塞的时间大于执行时间,即该任务是IO密集型的,我们就需要创建比处理器核心数大几倍数量的线程。例如,如果任务有50%的时间处于阻塞状态,则程序所需线程数为处理器可用核心数的两倍。
常用线程池大小设置
- CPU密集型:核心线程数 = CPU核数 + 1
- IO密集型:核心线程数 = CPU核数 * 2 + 1
CPU核数可以用此法获得:Runtime.getRuntime().availableProcessors()
对于计算密集型的任务,**一个有N个处理器的系统通常使用一个N+1个线程的线程池来获得最优的利用率。+1的原因:**如果计算密集型的线程恰好在某时因为发生一个页错误或者因其它原因而暂停,刚好有一个"额外"的线程,可以确保在这种情况下CPU周期不会中断工作。
计算公式
N = CPU的数量
U = 期望的CPU的使用率,介于0-1之间
f:阻塞系数(阻塞时间占总时间的比例。总时间:阻塞时间 + 执行时间)
线程池大小 = N * U / (1 - f) //一个完全阻塞的任务是注定要挂掉的,无须担心阻塞系数会达到1。
举例:CPU核心数是4,期望cpu的使用率是100%,等待时间是4秒,计算时间是1秒。那么最优的池大小就是:
4 * 100% / (1 - 4/5) = 20
@Configuration
public class ThreadPoolConfig {
/**
* @Bean中声明的value不能跟定义的实例同名
*
*/
@Bean(value = "customAsyncTaskExecutor")
public ThreadPoolTaskExecutor asyncThreadPoolExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setQueueCapacity(2048);
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.setThreadNamePrefix("customAsyncTaskExecutor-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolTaskExecutor;
}
@Bean(value = "threadPoolExecutor")
public ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10000), new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolExecutor;
}
}