(拉钩)Android工程师进阶34讲-11:线程池之刨根问底

0. 前言

在介绍synchronized原理时,已经了解了Java中线程的创建以及上下切换是比较消耗性能的,因此引入了偏向锁、轻量级锁等优化技术,目的就是减少用户态和核心态之间的切换频率。但是在这些优化基础之上,还有另外一个角度值得思考:创建和销毁线程非常损耗性能,那有没有可能复用一些已经被创建的好的线程呢?那就是线程池。

另外,线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的空间,在线程销毁时需要回收这些系统资源,频繁的创建销毁线程会浪费大量资源,而通过复用已有线程可以更好地管理和协调线程的工作。

线程池主要解决两个问题:

  • 1、当执行大量异步任务时,线程池能提供更好的性能。
  • 2、线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。

1. 线程池体系

线程池体系如下图:

说明:

  • Executor:线程池最顶层的接口,在Executor中只有一个execute()方法,用于执行任务。至于线程创建、调度等细节由子类实现。
  • ExecutorService:继承并扩展了Executor,在ExecutorService内部提供了更全面的任务提交机制以及线程池关闭的方法。
  • ThreadPoolExecutor:是ExecutorService的默认实现,所谓的线程池机制也大多封装在此类当中,因此它是分析的重点。
  • ScheduledExecutorService继承自ExecutorService,增加了定时任务相关方法。
  • ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,并实现了ScheduledExecutorService接口。
  • ForkJoinPool是一种支持任务分解的线程池,一般要配合可分解任务接口ForkJoinTask来使用。

2. 创建线程池

JDK提供了一个线程池的工厂类——Executors。在Executors中定义了多个静态方法,用来创建不同配置的线程池。常用有以下几种。

2.1 newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按先进先出的顺序执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CreateSingleThreadPool {
public static void main(String[] args) throws InterruptedException {
// 创建单线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

for (int i = 0; i <= 5; i++) {
final int taskId = i;
// 向线程池中提交任务
singleThreadExecutor.submit(new Runnable() {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + " 正在执行 task:" + taskId);
}
});
Thread.sleep(1000);
}
}
}

执行上述代码,结果如下,可以看出所有的task始终是在同一个线程中执行的。

1
2
3
4
5
6
线程:pool-1-thread-1 正在执行 task:0
线程:pool-1-thread-1 正在执行 task:1
线程:pool-1-thread-1 正在执行 task:2
线程:pool-1-thread-1 正在执行 task:3
线程:pool-1-thread-1 正在执行 task:4
线程:pool-1-thread-1 正在执行 task:5

2.2 newCachedThreadPool

创建一个可缓存线程,如果线程池长度超过处理需要,可灵活回收空线程,若无可回收,则新建线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CreateCacheThreadPool {
public static void main(String[] args) {
ExecutorService cacheThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i <= 5; i++) {
final int taskId = i;

cacheThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread().getName() + " 正在执行 task:" + taskId);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
cacheThreadPool.shutdown();
}
}

执行效果如下:

1
2
3
4
5
6
线程:pool-1-thread-1 正在执行 task:0
线程:pool-1-thread-2 正在执行 task:1
线程:pool-1-thread-3 正在执行 task:2
线程:pool-1-thread-4 正在执行 task:3
线程:pool-1-thread-5 正在执行 task:4
线程:pool-1-thread-6 正在执行 task:5

从日志可以看出,缓存线程池会创建新的线程来执行任务。但是如果将代码修改一下,在提交任务之前休眠1秒,修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CreateCacheThreadPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService cacheThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i <= 5; i++) {
final int taskId = i;

// 每次执行任务前休眠1秒
Thread.sleep(1000);

cacheThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread().getName() + " 正在执行 task:" + taskId);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
cacheThreadPool.shutdown();
}
}

打印如下:

1
2
3
4
5
6
线程:pool-1-thread-1 正在执行 task:0
线程:pool-1-thread-1 正在执行 task:1
线程:pool-1-thread-1 正在执行 task:2
线程:pool-1-thread-1 正在执行 task:3
线程:pool-1-thread-1 正在执行 task:4
线程:pool-1-thread-1 正在执行 task:5

2.3 newFixedThreadPool

创建一个固定数目的、可重用的线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CreateFixedThreadPool {
public static void main(String[] args) {
// 创建线程数量为3的线程池
ExecutorService singleThreadThreadExecutor = Executors.newFixedThreadPool(3);
// 提交10个任务交给线程池执行
for (int i = 0; i <= 10; i++) {
final int taskId = i;

singleThreadThreadExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + " 正在执行 task:" + taskId);
}
});
}
}
}

打印:

1
2
3
4
5
6
7
8
9
10
11
线程:pool-1-thread-2 正在执行 task:1
线程:pool-1-thread-1 正在执行 task:0
线程:pool-1-thread-3 正在执行 task:2
线程:pool-1-thread-1 正在执行 task:4
线程:pool-1-thread-2 正在执行 task:3
线程:pool-1-thread-1 正在执行 task:6
线程:pool-1-thread-3 正在执行 task:5
线程:pool-1-thread-1 正在执行 task:8
线程:pool-1-thread-2 正在执行 task:7
线程:pool-1-thread-1 正在执行 task:10
线程:pool-1-thread-3 正在执行 task:9

2.4 newScheduledThreadPool

创建一个定时线程池,支持定时及周期性任务执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CreateScheduledThreadPool {
public static void main(String[] args) throws InterruptedException {
// 指定线程数量为2的定时任务线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Date now = new Date();
System.out.println("线程:" + Thread.currentThread().getName() + " 报时:" + now);
}
}, 500, 500, TimeUnit.MILLISECONDS);

Thread.sleep(5000);
// 关闭定时任务
scheduledThreadPool.shutdown();
}
}

上面代码创建了一个线程数量为2的定时任务线程池,通过scheduleAtFixedRate()方法,指定每隔500毫秒执行一次任务,并且在5秒之后通过shutdown()关闭定时任务。执行效果如下:

1
2
3
4
5
6
7
8
9
线程:pool-1-thread-1 报时:Tue Jul 28 14:25:04 CST 2020
线程:pool-1-thread-1 报时:Tue Jul 28 14:25:04 CST 2020
线程:pool-1-thread-2 报时:Tue Jul 28 14:25:05 CST 2020
线程:pool-1-thread-1 报时:Tue Jul 28 14:25:05 CST 2020
线程:pool-1-thread-2 报时:Tue Jul 28 14:25:06 CST 2020
线程:pool-1-thread-1 报时:Tue Jul 28 14:25:06 CST 2020
线程:pool-1-thread-2 报时:Tue Jul 28 14:25:07 CST 2020
线程:pool-1-thread-2 报时:Tue Jul 28 14:25:07 CST 2020
线程:pool-1-thread-1 报时:Tue Jul 28 14:25:08 CST 2020

上面是常用的几种线程池的使用方式,但是《阿里Java开发手册》中已经严禁使用Executors来创建线程池了,为什么呢?下面先看看线程池的工作原理。

3. 线程池工作原理分析

3.1 案例

某工艺品加工厂有三台机器用来生产订单所需的产品,正常情况下,三天机器能保证所有订单按时按需生产完毕,如下图:

如果订单突然大幅增加,三台机器已经处于满负荷状态,一时间无法完成新增的订单任务,怎么办呢?如果接下了新的订单,会将新来的订单暂时存放在仓库中,当有加工机器空闲时,再用来生产仓库中的订单,如下图:

如果订单持续快速增长,导致仓库也存储满了?又该如何?正常情况下加工厂肯定会通过购买新的加工机器来满足订单需求,如下图:

有了仓库和新买的机器,加工厂业务还能正常流转。但是当某些极端情况发生时,比如节假日之后的爆单。这时新增的订单连仓库以及所有的加工机器都不能满足,说明工厂已经不能接新的订单了,只能拒绝新的订单。

线程池的工作流程和上面描述的加工厂完成订单任务类似,并且在线程池的构造器中,通过传入的参数可以设置默认有多少台加工机器、仓库的大小、可以购买新的加工机器的最大数量等等。

3.2 线程池结构

从上图可以看出,线程池内部主要包含以下几个部分:

  • work集合:保存所有的核心线程和非核心线程,其本质是一个HashSet
  • 等待任务队列:当核心线程的个数达到corePiilSize时,提交新的任务会被先保存在等待队列中,其本质是一个阻塞队列BlockingQueue
  • ctl:是一个AtomicInteger类型,二进制高3位用来标识线程池的状态,低29位用来记录线程池中线程的数量。

获取线程池状态、工作线程数量、修改ctl的方法分别如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 计算当前运行状态
private static int runStateOf(int c){
return c & -CAPACITY;
}

// 计算当前线程数量
private static int workerCountOf(int c){
return c & CAPACITY;
}

// 通过状态和线程数量生成ctl
private static int ctlOf(int rs, int wc){
return rs | wc;
}

线程池主要有几下几种运行状态:

  • RUNNING:默认状态,接收新任务并处理排队任务;
  • SHUTDOWN:不接受新任务,但处理排队任务,调用shutDown()会处于该状态;
  • STOP:不接受新任务,也不处理排队任务,并中断正在运行的任务,调用shutDownNow()会处于该状态;
  • TIDYING:所有任务都已终止,workCount为0时,线程会转换到TIDYING状态,并将运行terminate()方法。
  • TERMINATED:terminate()运行完成后,线程池转为此状态。

3.3 参数分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

参数说明:

  • corePoolSIze:表示核心线程数量。
  • maximumPoolSize:表示线程池最大能容纳同时执行的线程数,必须 >= 1。如果和corePoolSize相等即是固定大小线程池。
  • keepAliveTime:表示线程池中的空闲时间,当空闲时间达到此值时,线程会被销毁直到剩下corePoolSize个线程。
  • unit:用来指定KeepAliveTime的时间单位,有MILLISECONDSSECONDSMINUTESHOURS等。
  • workQueue:等待队列,BlockingQueue类型。当请求任务数大于corePoolSize时,任务将被缓存到此BlockingQueue中。
  • threadFactory:线程工厂,线程池中使用它来创建线程,如果传入的是null,则使用默认工厂类DefaultThreadFactory
  • handler:执行拒绝策略的对象。当workQueue满了之后并且活动线程数大于maximumPoolSize时,线程池通过该策略处理请求。

注意:当ThreadPoolExecutorallowCoreThreadTimeOut设置为true时,核心线程超时后也会被销毁。

3.4 流程解析

当调用execute或者submit,将一个任务交给线程池,线程池接收这个任务请求后,有以下几种处理情况:

  • 1、当前线程池中运行的线程数量还没有达到corePoolSize大小时,线程池会创建一个新线程执行提交的任务,无论之前创建的线程是否处于空闲状态。例如:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class LessThanCoreCount {
    public static void main(String[] args) throws InterruptedException {
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

    for (int i = 1; i <= 5; i++) {
    final int taskId = i;

    fixedThreadPool.execute(new Runnable() {
    @Override
    public void run() {
    try {
    System.out.println("线程:" + Thread.currentThread().getName() + " 正在执行 task:" + taskId);
    // 任务耗时100毫秒
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    // 延时2s向线程池中提交任务
    Thread.sleep(2000);
    }
    fixedThreadPool.shutdown();
    }
    }

上面代码创建了3个固定数量的线程池,每次提交的任务耗时100毫秒。每次提交任务之前都会延迟2秒,保证线程池中的工作线程已经执行完毕的,但是执行效果如下:

1
2
3
4
5
线程:pool-1-thread-1 正在执行 task:1
线程:pool-1-thread-2 正在执行 task:2
线程:pool-1-thread-3 正在执行 task:3
线程:pool-1-thread-1 正在执行 task:4
线程:pool-1-thread-2 正在执行 task:5

虽然线程1和线程2都已经执行完毕,并处于空闲状态,但是线程池还是会尝试创建新的线程去执行新提交的任务,知道线程数量达到corePoolSize

  • 2、当前线程池中运行的线程数量已经达到corePoolSize时,线程池会把任务加到等待队列中,直到某一个线程空闲了,线程池会根据设置的等待队列规则,从队列中取出一个新的任务执行。例如:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class MoreThanCoreCount {
    public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor fixedThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);

    for (int i = 1; i <= 5; i++) {
    final int taskId = i;

    fixedThreadPool.execute(new Runnable() {
    @Override
    public void run() {
    try {
    System.out.println("线程:" + Thread.currentThread().getName() + " 正在执行 task:" + taskId);
    // 任务耗时4秒
    Thread.sleep(4000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    System.out.println("此时等待队列中有 " + fixedThreadPool.getQueue().size() + " 个元素。");
    // 延时500毫秒向线程池中提交任务
    Thread.sleep(500);
    }
    fixedThreadPool.shutdown();
    }
    }

代码中,任务耗时4秒。此时新的任务提交给线程时,任务会缓存到等待队列中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 1
此时等待队列中有 0 个元素。
线程:pool-1-thread-1 正在执行 task:1
此时等待队列中有 0 个元素。
线程:pool-1-thread-2 正在执行 task:2

// 2
此时等待队列中有 1 个元素。
此时等待队列中有 2 个元素。
此时等待队列中有 3 个元素。
线程:pool-1-thread-1 正在执行 task:3
线程:pool-1-thread-2 正在执行 task:4
线程:pool-1-thread-1 正在执行 task:5

1中通过两个核心线程直接执行提交的任务,因此等待队列中的数量为0;2中表明,此时核心线程都已经被占用,新提交的任务都被放入等待队列中。

  • 3、如果线程数大于corePoolSize,但是还没达到最大线程池数maximumPoolSize,并且等待队列已满,则线程池会创建新的线程来执行任务。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class NonCoreThread {
    public static void main(String[] args) throws InterruptedException {
    // 核心线程为2,最大线程数为10,等待队列长度为2
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 10, 0L,
    TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(2));

    for (int i = 1; i <= 5; i++) {
    final int taskId = i;

    threadPool.execute(new Runnable() {
    @Override
    public void run() {
    try {
    System.out.println("线程:" + Thread.currentThread().getName() + " 正在执行 task:" + taskId);
    // 任务耗时4秒
    Thread.sleep(4000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    System.out.println("此时等待队列中有 " + threadPool.getQueue().size() + " 个元素。");
    // 延时500毫秒向线程池中提交任务
    Thread.sleep(500);
    }
    threadPool.shutdown();
    }
    }

执行效果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1
此时等待队列中有 0 个元素。
线程:pool-1-thread-1 正在执行 task:1
此时等待队列中有 0 个元素。
线程:pool-1-thread-2 正在执行 task:2

// 2
此时等待队列中有 1 个元素。
此时等待队列中有 2 个元素。
此时等待队列中有 2 个元素。
线程:pool-1-thread-3 正在执行 task:5 // 3

线程:pool-1-thread-1 正在执行 task:3
线程:pool-1-thread-2 正在执行 task:4

说明:

  • 1处表示线程数量已经达到corePoolSize

  • 2处表示等待队列已满。

  • 3处创建新的线程执行任务。

  • 4、最后如果提交的任务,无法被核心线程直接执行,又无法加入等待队列,又无法创建“非核心线程”直接执行,线程池将根据拒绝处理器定义的策略处理这个任务。比如在ThreadPoolExecutor中,如果没有为线程池设置RejectedExecutionHandler。这时线程池会抛出RejectedExecutionException,即线程池拒绝接受这个任务。如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class ThreadPoolRejectHandle {
    public static void main(String[] args) throws InterruptedException {
    // 核心线程为2,最大线程数为3,等待队列长度为2
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 0L,
    TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(2));

    for (int i = 1; i <= 5; i++) { // 提交6次任务
    final int taskId = i;

    threadPool.execute(new Runnable() {
    @Override
    public void run() {
    try {
    System.out.println("线程:" + Thread.currentThread().getName() + " 正在执行 task:" + taskId);
    // 任务耗时5秒
    Thread.sleep(5000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    System.out.println("此时等待队列中有 " + threadPool.getQueue().size() + " 个元素。");
    }
    }
    }

执行效果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
此时等待队列中有 0 个元素。
此时等待队列中有 0 个元素。
此时等待队列中有 1 个元素。
此时等待队列中有 2 个元素。
此时等待队列中有 2 个元素。
线程:pool-1-thread-1 正在执行 task:1
线程:pool-1-thread-2 正在执行 task:2
线程:pool-1-thread-3 正在执行 task:5
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadPoolRejectHandle$1@d716361 rejected from java.util.concurrent.ThreadPoolExecutor@6ff3c5b5[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at ThreadPoolRejectHandle.main(ThreadPoolRejectHandle.java:14)
线程:pool-1-thread-1 正在执行 task:3
线程:pool-1-thread-2 正在执行 task:4

程序报了RejectedExecutionException,拒绝策略是线程池的一种保护机制,目的就是当这种无节制的线程资源申请发生时,拒绝新的任务保护线程池。默认拒绝策略会直接报异常,但是JDK中一共提供了4中保护策略,如下:

名称 描述
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException。这时线程池默认的拒绝策略,在任务不能再提交时,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此策略,这样在系统不能承载更大的并发量的时候,能及时通过异常发现。
ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常。使用此策略,可能会无法发现系统的异常状态。建议一些不重要的业务采用此策略。
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新提交被拒绝的任务。是否要采用此种拒绝策略,要根据实际业务是否允许丢弃老任务来衡量。
ThreadPoolExecutor.CallerRunsPolicy 由调用线程(提交任务的线程)处理该任务。这种情况是需要让所有任务都执行完毕,那么就适合大量计算的任务类型去执行,多线程仅仅是增大吞吐量的手段,最终必须要让每个任务都执行完毕。

实际上拒绝策略都是实现接口RejectedExecutionException,也可以自定义类实现接口,定义自己的拒绝策略。

整个流程的动画演示:漫画Java线程池的工作机制

4. 为何禁止使用Executors

再来看看对于禁止使用Executors,特别是newFixedThreadPoolnewCachedThreadPool两个方法。

比如如下使用newFixedThreadPool方法创建线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class FixedThreadPoolOOM {
public static void main(String[] args) {
ThreadPoolExecutor fixedThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
// 提交10个任务交给线程池执行
for (int i = 0; i <= 1000000; i++) {
final int taskId = i;
System.out.println("execute task:" + taskId);

fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread().getName() + " 正在执行 task:" + taskId);
// 任务耗时10秒
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
fixedThreadPool.shutdown();
}
}

上述代码创建一个固定线程数量为2的线程池,并通过for循环向线程池中提交100W个任务。

通过java -Xms4m -Xmx4m FixedThreadPoolOOM执行上述代码:

可以看到当任务执行到7W多个时候,程序发生OOM。为什么呢?看一下newSingleThreadExecutor()newFixedThreadPool()的具体实现,如下:

1
2
3
4
5
6
7
8
9
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

可以看到传入的是一个无界的阻塞队列,理论上可以无限添加任务到线程池。当核心线程执行时间很长,则新提交的任务在不断插入到阻塞队列中,最终造成OOM。

再看一下newCachedThreadPool()有什么问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CacheThreadPoolOOM {
public static void main(String[] args) {
ThreadPoolExecutor fixedThreadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
// 提交10个任务交给线程池执行
for (int i = 0; i <= 1000000; i++) {
final int taskId = i;
System.out.println("execute task:" + taskId);

fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread().getName() + " 正在执行 task:" + taskId);
// 任务耗时10秒
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
fixedThreadPool.shutdown();
}
}

上述代码同样会报OOM,只是错误的log信息有点区别:无法创建新的线程。

看一下newCachedThreadPool()的实现:

1
2
3
4
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

可以看到,缓存线程池的最大线程数为Integer最大值。当核心线程耗时很久,线程池会尝试创建新的线程来执行提交任务,当内部不足时就会报无法创建线程的错误。

  • Copyrights © 2019-2020 Tyler Liu

请我喝杯咖啡吧~

支付宝
微信