[TOC]
线程池底层都是通过 ThreadPoolExecutor 来实现的
线程池的执行过程
这里用一个图来说明线程池的执行流程
任务被提交到线程池,会先判断当前线程数量是否小于corePoolSize,如果小于则创建线程来执行提交的任务,否则将任务放入workQueue队列,如果workQueue满了,则判断当前线程数量是否小于maximumPoolSize,如果小于则创建线程执行任务,否则就会调用handler,以表示线程池拒绝接收任务。
构造函数
1 | public ThreadPoolExecutor(int corePoolSize, |
参数介绍
参数 | 类型 | 含义 |
---|---|---|
corePoolSize | int | 核心线程数 |
maximumPoolSize | int | 最大线程数 |
keepAliveTime | long | 存活时间 |
unit | TimeUnit | 时间单位 |
workQueue | BlockingQueue | 存放线程的队列 |
threadFactory | ThreadFactory | 创建线程的工厂 |
handler | RejectedExecutionHandler | 多余的的线程处理器(拒绝策略) |
核心线程数corePoolSize
这个参数表示线程池中的基本线程数量也就是核心线程数量。
最大线程数maximumPoolSize[ˈmæksɪməm]
这个参数是线程池中允许创建的最大线程数量。
1 | 当使用有界队列时,且队列存放的任务满了,那么线程池会创建新的线程(最大不会超过这个参数所设置的值)。需要注意的是,**当使用无界队列时,这个参数是无效的。 |
线程存活时间keepAliveTime
这个就是非核心线程空闲时可以存活的时间,一旦超过这个时间,线程就会被销毁。
unit
keepAliveTime的单位。
workQueue
当前线程数超过corePoolSize时,新的任务会处在等待状态,并存在workQueue中,BlockingQueue是一个先进先出的阻塞式队列实现,底层实现会涉及Java并发的AQS机制,有关于AQS的相关知识,我会单独写一篇,敬请期待。
threadFactory
创建线程的工厂类,通常我们会自顶一个threadFactory设置线程的名称,这样我们就可以知道线程是由哪个工厂类创建的,可以快速定位。
handler
线程池执行拒绝策略,当线数量达到maximumPoolSize大小,并且workQueue也已经塞满了任务的情况下,线程池会调用handler拒绝策略来处理请求。
系统默认的拒绝策略有以下几种:
- AbortPolicy:为线程池默认的拒绝策略,该策略直接抛异常处理。
- DiscardPolicy:直接抛弃不处理。
- DiscardOldestPolicy:丢弃队列中最老的任务。
- CallerRunsPolicy:将任务分配给当前执行execute方法线程来处理。
我们还可以自定义拒绝策略,只需要实现RejectedExecutionHandler接口即可,友好的拒绝策略实现有如下:
- 将数据保存到数据,待系统空闲时再进行处理
- 将数据用日志进行记录,后由人工处理
问题
现有一个线程池,参数corePoolSize = 5,maximumPoolSize = 10,BlockingQueue阻塞队列长度为5,此时有4个任务同时进来,问:线程池会创建几条线程?
如果4个任务还没处理完,这时又同时进来2个任务,问:线程池又会创建几条线程还是不会创建?
如果前面6个任务还是没有处理完,这时又同时进来5个任务,问:线程池又会创建几条线程还是不会创建?
线程池corePoolSize=5,线程初始化时不会自动创建线程,所以当有4个任务同时进来时,执行execute方法会新建【4】条线程来执行任务;
前面的4个任务都没完成,现在又进来2个队列,会新建【1】条线程来执行任务,这时poolSize=corePoolSize,还剩下1个任务,线程池会将剩下这个任务塞进阻塞队列中,等待空闲线程执行;
如果前面6个任务还是没有处理完,这时又同时进来了5个任务,此时还没有空闲线程来执行新来的任务,所以线程池继续将这5个任务塞进阻塞队列,但发现阻塞队列已经满了,核心线程也用完了,还剩下1个任务不知道如何是好,于是线程池只能创建【1】条“临时”线程来执行这个任务了;
这里创建的线程用“临时”来描述还是因为它们不会长期存在于线程池,它们的存活时间为keepAliveTime,此后线程池会维持最少corePoolSize数量的线程。
IO密集型和CPU密集型
CPU密集型任务应配置尽可能小的线程,如配置CPU数目+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*CPU数目。
线程池大小的设置
问题1:
一个服务器有八个cpu,处理请求5ms,io操作200ms,理想情况下应该开什么线程?1s会处理多少请求?
8* U_cpu *(1+200/5)
计算密集型任务:
N = N_cpu + 1
- 加 1 的原因:当有一个线程偶尔故障时,额外的那个线程可以立即补上,保证CPU时钟不会被浪费
包含 I/O 或其他阻塞操作:
N = N_cpu * U_cpu * (1 + W / C)
N_cpu:CPU 的个数
U_cpu:目标 CPU 利用率
W / C:等待时间 (Wait) / 计算时间 (Compute)
获取 CPU 数目的方法:
int N_CPUS = Runtime.getRuntime().availableProcessors();
问题2
假设要求一个系统的 TPS(Transaction Per Second 或者 Task Per Second)至少为20,然后假设每个Transaction由一个线程完成,继续假设平均每个线程处理一个Transaction的时间为4s
如何设计线程个数,使得可以在1s内处理完20个Transaction?
20/(1/4)=80
问题3
计算操作需要5ms,DB操作需要 100ms,对于一台 8个CPU的服务器,怎么设置线程数呢?
线程数 = 8 * (1 + 100/5) = 168 (个)
那如果DB的 QPS(Query Per Second)上限是1000,此时这个线程数又该设置为多大呢?
一个线程每秒处理的任务数 1000/105,168个线程168*(1000/105)=1600QPS
168*(1000/1600)=105
问题4
任务处理时间 100ms,服务器 4 核 8G 如何设计线程池达到 1000qps?任务是 90ms 在 IO,10ms 在计算的情况下怎么弄?全在计算呢?
一个线程一秒处理10个任务 10QPS 1000/10=100个线程 4*(1+90/10)=40个线程
8 *(90/10+1)
四种类型
1.FixedThreadPool
所有任务只能使用固定大小的线程,超出的线程会在队列中等待。
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
FixedThreadPool
的corePoolSize
和maximumPoolSize
都设置为参数nThreads,keepAliveTime
为0L,表示多余的线程立刻终止,因为不会产生多余的线程它的任务队列采用的是LinkedBlockingQueue。
创建线程池的方法,在我们的程序中只需要,后面其他种类的同理:
1 | public static void main(String[] args) { |
2.CachedThreadPool
一个任务创建一个线程
1 | public static ExecutorService newCachedThreadPool() { |
CachedThreadPool
的corePoolSize
是0,maximumPoolSize
是Integer.MAX_VALUE,也就是说CachedThreadPool
没有核心线程,全部都是非核心线程,并且没有上限。keepAliveTime
是60秒,就是说空闲线程等待新任务60秒,超时则销毁。此处用到的队列是阻塞队列SynchronousQueue
[ˈsɪŋkrənəs],这个队列没有缓冲区,所以其中最多只能存在一个元素,有新的任务则阻塞等待。
3.SingleThreadExecutor
相当于大小为 1 的 FixedThreadPool。其创建源码如下:
1 | public static ExecutorService newSingleThreadExecutor() { |
我们可以看到总线程数和核心线程数都是1,所以就只有一个核心线程。该线程池才用链表阻塞队列LinkedBlockingQueue
,先进先出原则,所以保证了任务的按顺序逐一进行。
4.ScheduledThreadPool
ScheduledThreadPool
是一个能实现定时和周期性任务的线程池,它的创建源码如下:
1 | public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { |
这里创建了ScheduledThreadPoolExecutor
,继承自ThreadPoolExecutor
,主要用于定时延时或者定期处理任务。ScheduledThreadPoolExecutor
的构造如下:
1 | public ScheduledThreadPoolExecutor(int corePoolSize) { |
可以看出corePoolSize
是传进来的固定值,maximumPoolSize
无限大,因为采用的队列DelayedWorkQueue
是无解的,所以maximumPoolSize
参数无效。该线程池执行如下:
当执行scheduleAtFixedRate
或者scheduleWithFixedDelay
方法时,会向DelayedWorkQueue
添加一个实现RunnableScheduledFuture
接口的ScheduledFutureTask
(任务的包装类),并会检查运行的线程是否达到corePoolSize
。如果没有则新建线程并启动ScheduledFutureTask
,然后去执行任务。如果运行的线程达到了corePoolSize
时,则将任务添加到DelayedWorkQueue
中。DelayedWorkQueue
会将任务进行排序,先要执行的任务会放在队列的前面。在跟此前介绍的线程池不同的是,当执行完任务后,会将ScheduledFutureTask
中的time
变量改为下次要执行的时间并放回到DelayedWorkQueue
中。
5.代码
1 | class test { |
1 | class test { |
6.对比
FixedThreadPool 适用于处理CPU密集型的任务,尽可能的少的分配线程,即适用执行长期的任务。
CachedThreadPool用于并发执行大量短期的小任务。
SingleThreadExecutor适用于串行执行任务的场景,一个任务一个任务地执行。
newScheduledThreadPool 周期性执行任务的场景,需要限制线程数量的场景
拒绝策略
Abort 策略
默认策略,新任务提交时直接抛出异常RejectedExecutionException,该异常可由调用者捕获。
CallerRuns 策略:
不会在线程池的线程中执行新的任务,而是在调用exector的线程中运行新的任务。
Discard策略:
直接丢弃新提交的任务;
DiscardOlds策略:
如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程);
代码
1 | package concurrency.pool; |
有哪些工作队列
SynchronousQueue [ˈsɪŋkrənəs]:
是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
1 | public static void main(String[] args) { |
ArrayBlockingQueue:
ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量
1 | //2个核心线程最大线程为3的线程池,阻塞队列大小为2 |
LinkedBlockingQueue
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE
1 | //2个核心线程最大线程为3的线程池,阻塞队列大小为2 |
DelayQueue
一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。
如何优雅关闭线程池
run和start
线程的run()方法是由java虚拟机直接调用的,如果我们没有启动线程(没有调用线程的start()方法)而是在应用代码中直接调用run()方法,那么这个线程的run()方法其实运行在当前线程(即run()方法的调用方所在的线程)之中,而不是运行在其自身的线程中,从而违背了创建线程的初衷;
下面是一个用来说明start()方法和run()方法的区别的实例:
1 | public class WelcomThread extends Thread { |
运行结果:
1 | main |
Runnable vs Callable
Runnable
自 Java 1.0 以来一直存在,但Callable
仅在 Java 1.5 中引入,目的就是为了来处理Runnable
不支持的用例。Runnable
接口不会返回结果或抛出检查异常,但是Callable
接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 *Runnable
接口*,这样代码看起来会更加简洁。
工具类 Executors
可以实现 Runnable
对象和 Callable
对象之间的相互转换。(Executors.callable(Runnable task
)或 Executors.callable(Runnable task,Object resule)
)。
1 | Runnable.java |
execute() vs submit()
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;- .对返回值的处理不同 execute方法不关心返回值。 submit方法有返回值,Future.
- 对异常的处理不同
excute方法会抛出异常。 sumbit方法不会抛出异常。除非你调用Future.get()
我们以AbstractExecutorService
接口中的一个 submit
方法为例子来看看源代码:
1 | public class Hero { |
shutdown()VS shutdownNow()
shutdown()
:关闭线程池,线程池的状态变为SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕。shutdownNow()
:关闭线程池,线程的状态变为STOP
。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。
isTerminated() VS isShutdown()
isShutDown
当调用shutdown()
方法后返回为 true。isTerminated
当调用shutdown()
方法后,并且所有提交的任务完成后返回为 true
1 | import java.util.concurrent.ExecutorService; |