线程池底层&四种类型&参数含义

[TOC]

线程池底层都是通过 ThreadPoolExecutor 来实现的

线程池的执行过程

这里用一个图来说明线程池的执行流程

img

任务被提交到线程池,会先判断当前线程数量是否小于corePoolSize,如果小于则创建线程来执行提交的任务,否则将任务放入workQueue队列,如果workQueue满了,则判断当前线程数量是否小于maximumPoolSize,如果小于则创建线程执行任务,否则就会调用handler,以表示线程池拒绝接收任务。

构造函数

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}

参数介绍

参数 类型 含义
corePoolSize int 核心线程数
maximumPoolSize int 最大线程数
keepAliveTime long 存活时间
unit TimeUnit 时间单位
workQueue BlockingQueue 存放线程的队列
threadFactory ThreadFactory 创建线程的工厂
handler RejectedExecutionHandler 多余的的线程处理器(拒绝策略)

核心线程数corePoolSize

这个参数表示线程池中的基本线程数量也就是核心线程数量。

最大线程数maximumPoolSize[ˈmæksɪməm]

这个参数是线程池中允许创建的最大线程数量。

1
当使用有界队列时,且队列存放的任务满了,那么线程池会创建新的线程(最大不会超过这个参数所设置的值)。需要注意的是,**当使用无界队列时,这个参数是无效的。

线程存活时间keepAliveTime

这个就是非核心线程空闲时可以存活的时间,一旦超过这个时间,线程就会被销毁。

unit

keepAliveTime的单位。

workQueue

当前线程数超过corePoolSize时,新的任务会处在等待状态,并存在workQueue中,BlockingQueue是一个先进先出的阻塞式队列实现,底层实现会涉及Java并发的AQS机制,有关于AQS的相关知识,我会单独写一篇,敬请期待。

threadFactory

创建线程的工厂类,通常我们会自顶一个threadFactory设置线程的名称,这样我们就可以知道线程是由哪个工厂类创建的,可以快速定位。

handler

线程池执行拒绝策略,当线数量达到maximumPoolSize大小,并且workQueue也已经塞满了任务的情况下,线程池会调用handler拒绝策略来处理请求。

系统默认的拒绝策略有以下几种:

  1. AbortPolicy:为线程池默认的拒绝策略,该策略直接抛异常处理。
  2. DiscardPolicy:直接抛弃不处理。
  3. DiscardOldestPolicy:丢弃队列中最老的任务。
  4. CallerRunsPolicy:将任务分配给当前执行execute方法线程来处理。

我们还可以自定义拒绝策略,只需要实现RejectedExecutionHandler接口即可,友好的拒绝策略实现有如下:

  1. 将数据保存到数据,待系统空闲时再进行处理
  2. 将数据用日志进行记录,后由人工处理

问题

现有一个线程池,参数corePoolSize = 5,maximumPoolSize = 10,BlockingQueue阻塞队列长度为5,此时有4个任务同时进来,问:线程池会创建几条线程?

如果4个任务还没处理完,这时又同时进来2个任务,问:线程池又会创建几条线程还是不会创建?

如果前面6个任务还是没有处理完,这时又同时进来5个任务,问:线程池又会创建几条线程还是不会创建?

线程池corePoolSize=5,线程初始化时不会自动创建线程,所以当有4个任务同时进来时,执行execute方法会新建【4】条线程来执行任务;

前面的4个任务都没完成,现在又进来2个队列,会新建【1】条线程来执行任务,这时poolSize=corePoolSize,还剩下1个任务,线程池会将剩下这个任务塞进阻塞队列中,等待空闲线程执行;

如果前面6个任务还是没有处理完,这时又同时进来了5个任务,此时还没有空闲线程来执行新来的任务,所以线程池继续将这5个任务塞进阻塞队列,但发现阻塞队列已经满了,核心线程也用完了,还剩下1个任务不知道如何是好,于是线程池只能创建【1】条“临时”线程来执行这个任务了;

这里创建的线程用“临时”来描述还是因为它们不会长期存在于线程池,它们的存活时间为keepAliveTime,此后线程池会维持最少corePoolSize数量的线程。

IO密集型和CPU密集型

CPU密集型任务应配置尽可能小的线程,如配置CPU数目+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*CPU数目。

线程池大小的设置

问题1:

一个服务器有八个cpu,处理请求5ms,io操作200ms,理想情况下应该开什么线程?1s会处理多少请求?

8* U_cpu *(1+200/5)

  • 计算密集型任务:

    N = N_cpu + 1

    • 加 1 的原因:当有一个线程偶尔故障时,额外的那个线程可以立即补上,保证CPU时钟不会被浪费
  • 包含 I/O 或其他阻塞操作:

    N = N_cpu * U_cpu * (1 + W / C)

    • N_cpu:CPU 的个数

    • U_cpu:目标 CPU 利用率

    • W / C:等待时间 (Wait) / 计算时间 (Compute)

    • 获取 CPU 数目的方法:int N_CPUS = Runtime.getRuntime().availableProcessors();

问题2

假设要求一个系统的 TPS(Transaction Per Second 或者 Task Per Second)至少为20,然后假设每个Transaction由一个线程完成,继续假设平均每个线程处理一个Transaction的时间为4s

如何设计线程个数,使得可以在1s内处理完20个Transaction?

20/(1/4)=80

问题3

计算操作需要5ms,DB操作需要 100ms,对于一台 8个CPU的服务器,怎么设置线程数呢?

线程数 = 8 * (1 + 100/5) = 168 (个)

那如果DB的 QPS(Query Per Second)上限是1000,此时这个线程数又该设置为多大呢?

一个线程每秒处理的任务数 1000/105,168个线程168*(1000/105)=1600QPS

168*(1000/1600)=105

问题4

任务处理时间 100ms,服务器 4 核 8G 如何设计线程池达到 1000qps?任务是 90ms 在 IO,10ms 在计算的情况下怎么弄?全在计算呢?

一个线程一秒处理10个任务 10QPS 1000/10=100个线程 4*(1+90/10)=40个线程

8 *(90/10+1)

四种类型

1.FixedThreadPool

所有任务只能使用固定大小的线程,超出的线程会在队列中等待。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

FixedThreadPoolcorePoolSizemaximumPoolSize都设置为参数nThreads,keepAliveTime为0L,表示多余的线程立刻终止,因为不会产生多余的线程它的任务队列采用的是LinkedBlockingQueue。

img

创建线程池的方法,在我们的程序中只需要,后面其他种类的同理:

1
2
3
4
public static void main(String[] args) {
// 参数是要线程池的线程最大值
ExecutorService executorService = Executors.newFixedThreadPool(10);
}

2.CachedThreadPool

一个任务创建一个线程

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

CachedThreadPoolcorePoolSize是0,maximumPoolSize是Integer.MAX_VALUE,也就是说CachedThreadPool没有核心线程,全部都是非核心线程,并且没有上限。keepAliveTime是60秒,就是说空闲线程等待新任务60秒,超时则销毁。此处用到的队列是阻塞队列SynchronousQueue[ˈsɪŋkrənəs],这个队列没有缓冲区,所以其中最多只能存在一个元素,有新的任务则阻塞等待。

img

3.SingleThreadExecutor

相当于大小为 1 的 FixedThreadPool。其创建源码如下:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

我们可以看到总线程数和核心线程数都是1,所以就只有一个核心线程。该线程池才用链表阻塞队列LinkedBlockingQueue,先进先出原则,所以保证了任务的按顺序逐一进行。

img

4.ScheduledThreadPool

ScheduledThreadPool是一个能实现定时和周期性任务的线程池,它的创建源码如下:

1
2
3
4
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
复制代码

这里创建了ScheduledThreadPoolExecutor,继承自ThreadPoolExecutor,主要用于定时延时或者定期处理任务。ScheduledThreadPoolExecutor的构造如下:

1
2
3
4
5
6
    public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
复制代码

可以看出corePoolSize是传进来的固定值,maximumPoolSize无限大,因为采用的队列DelayedWorkQueue是无解的,所以maximumPoolSize参数无效。该线程池执行如下:

img

当执行scheduleAtFixedRate或者scheduleWithFixedDelay方法时,会向DelayedWorkQueue添加一个实现RunnableScheduledFuture接口的ScheduledFutureTask(任务的包装类),并会检查运行的线程是否达到corePoolSize。如果没有则新建线程并启动ScheduledFutureTask,然后去执行任务。如果运行的线程达到了corePoolSize时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务会放在队列的前面。在跟此前介绍的线程池不同的是,当执行完任务后,会将ScheduledFutureTask中的time变量改为下次要执行的时间并放回到DelayedWorkQueue中。

5.代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class test {
//初始化10个信号量在信号包中,让ABCD4个线程分别去获取
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newFixedThreadPool(5);
// ExecutorService threadPool = Executors.newCachedThreadPool();
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 6; i++) {
threadPool.submit(() -> {
System.out.println("current thread name: " + Thread.currentThread().getName());
Object object = null;
// System.out.print("result## "+object.toString());
});

}
}
}
1
2
3
4
5
6
7
8
9
10
class test {
//初始化10个信号量在信号包中,让ABCD4个线程分别去获取
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(()->{
System.out.println("current Time" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"正在执行");
}, 1, 3, TimeUnit.SECONDS);;
}
}

6.对比

FixedThreadPool 适用于处理CPU密集型的任务,尽可能的少的分配线程,即适用执行长期的任务。

CachedThreadPool用于并发执行大量短期的小任务。

SingleThreadExecutor适用于串行执行任务的场景,一个任务一个任务地执行。

newScheduledThreadPool 周期性执行任务的场景,需要限制线程数量的场景

拒绝策略

Abort 策略

默认策略,新任务提交时直接抛出异常RejectedExecutionException,该异常可由调用者捕获。

CallerRuns 策略:

不会在线程池的线程中执行新的任务,而是在调用exector的线程中运行新的任务。

Discard策略:

直接丢弃新提交的任务;

DiscardOlds策略:

如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程);

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package concurrency.pool;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Created by li on 2016/7/2.
*/
public class SaturationPolicy {

/**
* 线程池工作队列已满时,在不同饱和策略下表现
* @param handler 线程池工作队列饱和策略
*/
public static void policy(RejectedExecutionHandler handler){
//基本线程2个,最大线程数为3,工作队列容量为5
ThreadPoolExecutor exec = new ThreadPoolExecutor(2,3,0l, TimeUnit.MILLISECONDS,new LinkedBlockingDeque<>(5));
if (handler != null){
exec.setRejectedExecutionHandler(handler);//设置饱和策略
}
for (int i = 0; i < 10; i++) {
exec.submit(new Task());//提交任务
}
exec.shutdown();
}

public static void main(String[] args) {
// policy(new ThreadPoolExecutor.AbortPolicy());
// policy((new ThreadPoolExecutor.CallerRunsPolicy()));
// policy(new ThreadPoolExecutor.DiscardPolicy());
// policy(new ThreadPoolExecutor.DiscardOldestPolicy());
}

//自定义任务
static class Task implements Runnable {
private static int count = 0;
private int id = 0;//任务标识
public Task() {
id = ++count;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);//休眠3秒
} catch (InterruptedException e) {
System.err.println("线程被中断" + e.getMessage());
}
System.out.println(" 任务:" + id + "\t 工作线程: "+ Thread.currentThread().getName() + " 执行完毕");
}
}

}

有哪些工作队列

SynchronousQueue [ˈsɪŋkrənəs]:

是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public static void main(String[] args) {
//2个核心线程最大线程为3的线程池
Executor executors = new ThreadPoolExecutor(
2, 3, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new RejectHandler());
execute(executors);
}

private static void execute(Executor executors) {
executors.execute(new NameRunnable(1));
executors.execute(new NameRunnable(2));
executors.execute(new NameRunnable(3));
executors.execute(new NameRunnable(4));
executors.execute(new NameRunnable(5));
executors.execute(new NameRunnable(6));
}

private static class NameRunnable implements Runnable {
private int name;

public NameRunnable(int name) {
this.name = name;
}

public int getName() {
return name;
}

@Override
public void run() {
System.out.println(name + " is running... ");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " is end !!! ");
}
}

/***
* 拒绝的Runnable
*/
private static class RejectHandler implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
NameRunnable name = (NameRunnable) r;

System.out.print(name.getName() + " is rejected ^^\n");
}
}

在这里插入图片描述

ArrayBlockingQueue:

ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量

1
2
3
4
5
6
//2个核心线程最大线程为3的线程池,阻塞队列大小为2
Executor executors = new ThreadPoolExecutor(
2, 3, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new RejectHandler());
execute(executors);

在这里插入图片描述

LinkedBlockingQueue

LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//2个核心线程最大线程为3的线程池,阻塞队列大小为2
public static void main(String[] args) {
Executor executors = new ThreadPoolExecutor(
2, 6, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2),
new RejectHandler());
execute(executors);
}

private static void execute(Executor executors) {
executors.execute(new NameRunnable(1));
executors.execute(new NameRunnable(2));
executors.execute(new NameRunnable(3));
executors.execute(new NameRunnable(4));
executors.execute(new NameRunnable(5));
executors.execute(new NameRunnable(6));
executors.execute(new NameRunnable(7));
executors.execute(new NameRunnable(8));
executors.execute(new NameRunnable(9));
executors.execute(new NameRunnable(10));
executors.execute(new NameRunnable(11));
executors.execute(new NameRunnable(12));
executors.execute(new NameRunnable(13));
executors.execute(new NameRunnable(14));
}

在这里插入图片描述

DelayQueue

一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。

如何优雅关闭线程池

run和start

线程的run()方法是由java虚拟机直接调用的,如果我们没有启动线程(没有调用线程的start()方法)而是在应用代码中直接调用run()方法,那么这个线程的run()方法其实运行在当前线程(即run()方法的调用方所在的线程)之中,而不是运行在其自身的线程中,从而违背了创建线程的初衷;

下面是一个用来说明start()方法和run()方法的区别的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class WelcomThread extends Thread {
//在该方法中实现线程的任务逻辑
public void run() {
//获取当前正在执行的线程名称
System.out.println(Thread.currentThread().getName());
}
}
public class TestDemo1 {
public static void main(String[] args) {
Thread welcome = new WelcomThread(); //创建线程(动态规划)
welcome.run(); //直接调用run()方法
System.out.println(Thread.currentThread().getName());
Thread welcome1 = new WelcomThread(); //创建线程
welcome1.start(); //启动线程
}
}

运行结果:

1
2
3
main
main
Thread-1

Runnable vs Callable

Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。Runnable 接口不会返回结果或抛出检查异常,但是Callable 接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 *Runnable 接口*,这样代码看起来会更加简洁。

工具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Runnable.java
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}Copy to clipboardErrorCopied
Callable.java
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}
Copy to clipboardErrorCopied

execute() vs submit()

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. .对返回值的处理不同 execute方法不关心返回值。 submit方法有返回值,Future.
  3. 对异常的处理不同
    excute方法会抛出异常。 sumbit方法不会抛出异常。除非你调用Future.get()

我们以AbstractExecutorService接口中的一个 submit 方法为例子来看看源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Hero {

public static void main(String[] args) throws Exception{
submit();
submitWithGet();
// execute();
}
private static void submitWithGet() throws Exception{
ExecutorService service= Executors.newSingleThreadExecutor();
Future future=service.submit(new Runnable() {
@Override
public void run() {
int i=7/0;
}
});

future.get();

service.shutdown();
}
private static void submit(){
ExecutorService service= Executors.newSingleThreadExecutor();
service.submit(new Runnable() {
@Override
public void run() {
int i=7/0;
}
});

service.shutdown();
}
private static void execute(){
ExecutorService service= Executors.newSingleThreadExecutor();
service.execute(new Runnable() {
@Override
public void run() {
int i=7/0;
}
});
service.shutdown();
}
}

shutdown()VS shutdownNow()

  • shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
  • shutdownNow() :关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

isTerminated() VS isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test{
//由于只是为了测试下效果,所以随便搞个线程池来搭配,生产建议手动创建线程池
static ExecutorService executorService = Executors.newCachedThreadPool();

static class Task implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "运行任务");
}
}

public static void main(String[] args) throws Exception {
for (int i = 0; i < 100; i++) {
executorService.execute(new Task());
}
System.out.println("准备用shutdown方法关闭线程池");
executorService.shutdown();
TimeUnit.SECONDS.sleep(1);
System.out.println("线程池调用了shutdown方法,isShutdown=" + executorService.isShutdown());
System.out.println("任务还在执行中,isTerminated=" + executorService.isTerminated());
TimeUnit.SECONDS.sleep(3);
System.out.println("3秒后,任务都执行结束了,isTerminated=" + executorService.isTerminated());
}
}