Java多线程 —— 线程池原理及源码分析

使用线程池的好处

线程池也是池化技术的一个具体实现,这样说来,合理利用包括线程池等池化技术(如连接池等)能够带来三个好处:

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的使用

在了解原理之前,先了解一下如何使用它,玩熟了再往深处挖。

线程池的创建

我们首先来看一下线程池的默认构造函数,以及可传入的参数:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize:线程池中常驻核心线程数;
  • maximumPoolSize:能够容纳同时执行的最大线程数,必须大于等于1;
  • keepAliveTime:多余的空闲线程存活时间(当前线程数超过corePoolSize时,空闲时间超过keepAliveTime时,多余线程会被销毁直到只剩下corePoolSize个线程为止);
  • unit:keepAliveTime的单位;
  • workQueue:任务队列,被提交但尚未被执行的任务;
  • threadFactory:生成线程池中工作线程的线程工厂,用于创建线程,一般用默认的即可;
  • handler:拒绝策略,当队列满了,并且工作线程大于等于最大线程数,执行拒绝策略。

这里要对两个重点概念进行说明:

workQueue

workQueue是用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:

队列 说明
☆ArrayBlockingQueue 数组结构组成的有界阻塞队列
☆LinkedBlockingQueue 链表结构组成的有界(大小默认值为Integer.MAX_VALUE)阻塞队列(相当于无界)
PriorityBlockingQueue 支持优先级排序无界阻塞队列
DelayQueue 使用优先级队列实现的延迟无界阻塞队列
☆SynchronousQueue 不存储元素的阻塞队列,也即单个元素的队列
LinkedTransferQueue 链表结构组成的无界阻塞队列
LinkedBlockingDeque 链表结构组成的双向阻塞队列

handler(RejectHandler)

RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

以下是JDK1.8提供的四种策略:

策略 说明
AbortPolicy (默认实现,不敢用,动不动就崩)直接抛出异常。
CallerRunsPolicy 即不会抛弃任务,又不会抛出异常,而是将任务回退到调用者(比如退给main线程执行),从而降低新任务的流量。
DiscardOldestPolicy 丢弃队列里呆的最久的一个任务,然后把当前任务加入到队列中尝试再次提交。
DiscardPolicy 不处理,丢弃掉,也不抛出异常。如果允许任务丢失,这是最好的一种方案。
实现RejectedExecutionHandler接口 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

提交任务

线程池允许我们使用execute()和submit()两种方法提交任务:

  • execute提交任务,但是execute方法没有返回值,所以无法判断提交的任务是否被线程池执行成功;
  • submit方法来提交任务,它会将task封装为一个futureTask(后面我们还会提到),并返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值。
    • get()方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

线程池的关闭

我们可以通过调用线程池的shutdown()或shutdownNow()方法来关闭线程池,但是它们的实现原理不同:

  • shutdown()的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程;
  • shutdownNow()的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

只要调用了这两个关闭方法的其中一个,isShutdown()方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminated()方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown()来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow()

线程池的工作原理

工作流程

线程池的主要工作流程如下图:

ThreadPool

我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

  1. 首先线程池判断核心线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
  2. 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
  3. 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

线程池提交任务源码分析

这里我们还是分为submit和execute两种方式来分析。

当线程池调用submit()方法提交任务时,线程池会首先将这个Runnable的task封装为一个futureTask,然后跟execute的方式一样直接调用execute()方法来执行,此后将这个futureTask返回,便于以后get()来获取运行结果或拿到异常:

1
2
3
4
5
6
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

其后,这两种提交方式走的是同样的代码路径了。线程池调用executor()执行任务的方法如下(execute方式入口):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// clt记录着runState和workerCount
int c = ctl.get();
// workerCountOf方法取出低29位的值,表示当前活动的线程数
// 然后拿线程数和核心线程数做比较
if (workerCountOf(c) < corePoolSize) {
// 1. 判断核心线程池是否已满
// 如果活动线程数 < 核心线程数,添加到workers中
// addWorker中的第二个参数表示限制添加线程的数量
// 是根据corePoolSize来判断还是maximumPoolSize来判断
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 如果当前线程池是运行状态并且任务添加到队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 正在执行
reject(command);
else if (workerCountOf(recheck) == 0)
// 第一个参数为null,表示在线程池中创建一个线程,但不去启动
// 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize
addWorker(null, false);
}
else if (!addWorker(command, false))
// 3. 执行饱和策略
reject(command);
}

接下来我们看一下execute()中多次调用的addWorker()方法,它的主要工作是在线程池中创建一个新的线程并执行:

参数:

  • firstTask:指定新创建的线程执行的第一个任务,为null则不执行任务;
  • core:如果是true则使用corePoolSize来绑定,否则使用maximumPoolSize绑定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 如果状态值 >= SHUTDOWN (不接新任务,且不处理队列任务)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// 如果wc大于容量,或者core如果为true表示根据corePoolSize来比较,否则为maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 原子增加workerCount
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回重新执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建Worker对象
// 注意!!!
// 此处如果是execute方法调用,firstTask是一个Runnable
// 而如果是submit方法调用,firstTask是一个FutureTask
w = new Worker(firstTask);
// 根据worker创建一个线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

// firstTask == null表明只新建线程而不执行任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果t活着就抛异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 否则加入worker(HashSet)
// workers包含池中的所有工作线程。仅在持有mainLock时访问
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// worker的添加工作状态改为true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果worker的添加工作完成
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

线程池的异常处理

关于线程池的异常处理,这其实源于一次踩坑,现象是我发现通过submit()方法提交task(除0异常)之后,异常不打印出来了:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("test thread");
System.out.println(1 / 0);
}
});
}

控制台输出:

1
2
3
4
/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/bin/java ...
test thread

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

而通过execute()方法直接执行task,会抛出对应的异常:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("test thread");
System.out.println(1 / 0);
}
});
}

控制台输出:

1
2
3
4
5
6
test thread
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at com.leetcode.Result$1.run(Result.java:50)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

其实线程池对于异常的处理,跟提交线程的方式有关,分为submit的方式和execute的方式。

submit获取异常

上面我们分析过,submit()方法,是在AbstractExecutorService中实现的,它会将task封装成一个futureTask,然后execute()去执行。

接下来execute()方法是在ThreadPoolExecutor中实现的,整个execute()方法就是判断线程池状态,然后选择到底是new线程来执行还是加入等待队列,做事情的就是addWorker(),然后就会调用内部类Worker的run()方法。

在runWorker()中会调用futureTask的run()方法,如果出现了异常,会将这个Exception通过setException()的方式吞掉,它认为异常也是这个task执行的一部分

1
2
3
4
5
6
7
8
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 重点!!!
}

如果想拿到这个异常,需要调用futureTask的get()方法。

execute直接抛出异常

如果是execute()方法的话,task.run()直接调的就是Callable或者Runnable的run()方法了,所有的异常就直接抛了,因为没有futureTask的那层封装。

这里走的是Thread的run()方法。

UncaughtExceptionHandler是JVM调用的处理线程异常的:

1
2
3
4
5
6
7
/**
* Dispatch an uncaught exception to the handler. This method is
* intended to be called only by the JVM.
*/
private void dispatchUncaughtException(Throwable e) {
getUncaughtExceptionHandler().uncaughtException(this, e);
}

Executors类提供的几种线程池

理论聊了这么多,我们来看看juc中给我们提供了哪几种线程池的实现,以及它们各自都有什么特点。

线程池主要有5种(包含jdk 1.8):

1,Executors.newScheduledThreadPool():无限大小的线程池,每个线程存活时间是无限的,使用优先级队列实现的延迟无界阻塞队列,适用于周期性执行任务的场景。

1
2
3
4
5
public static ExecutorService newScheduledThreadPool(int corePoolSize) {
return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE,
0L, TimeUnit.NANOSECONDS,
new DelayedWorkQueue<Runnable>()));
}

2,Executors.newFixedThreadPool(int):固定数量线程的池子,每个线程存活时间是无限的,使用无界阻塞队列,适用于执行长期的任务。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

3,Executors.newSingleThreadPool():只有一个线程的池子,存活时间是无限的,使用无界阻塞队列,适用于一个任务一个任务执行的场景。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

4,Executors.newCachedThreadPool():新任务到来就插入到同步队列中,并且同时寻找可用线程来执行,使用单个元素的队列,适用于短期异步的任务或者负载较轻的服务器。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

5,Java8新出Executors.newWorkStealingPool(int):使用目前机器可用的处理器作为它的并行级别,采用ForkJoinPool来实现。

1
2
3
4
5
6
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

如何选择?

我们不禁眼花缭乱,juc给我们提供了如此多的的线程池的实现,那日常开发中如何选择哪种线程池来使用呢?

真相只有一个:哪个都不用!!!

【强制】《阿里巴巴Java开发手册》规定线程池不允许使用Executors去创建,而是通过new ThreadPoolExecutor的方式。

也就是通过ThreadPoolExecutor直接构造方法的方式。这主要是因为Executors返回线程池对象有其弊端:

  • FixedThreadPool和SingleThreadPool:允许请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求,导致OOM;
  • CachedThreadPool和ScheduledThreadPool:允许创建线程数量为Integer.MAX_VALUE,可能会创建大量线程,导致OOM。

那么我们在手动创建线程池时,如何合理配置呢?

合理配置线程池

从线程数量上来看:

以Runtime.getRuntime.availableProcessors()获得的CPU核数为基准:

  • CPU密集型:配置Ncpu + 1个线程的线程池(线程数应当等于核心数,但是再怎么计算密集,总有一些IO吧,所以再加一个线程来把等待IO的CPU时间利用起来);
  • IO密集型:线程并不是一直在执行任务,则配置尽可能多的线程,如2 * Ncpu;
    • 或者Ncpu / (1 - 阻塞系数),阻塞系数在0.8 - 0.9之间,比如8核CPU, 8 / (1 - 0.9) = 80个线程;
    • 看具体测出来的情况吧。

从任务的执行时间来看:

可以使用优先级队列,让执行时间短的任务先执行。

总结

本来之前写过一篇关于线程池的文章的,但过于浅显。自近日以来觉得做技术不光要会使用、熟练使用,更要深入到源码层面理解其工作原理。Talk is cheap,show me the code!自认为本篇文章由表入里,已经将线程池的大部分内容分析透彻了。