EagerThreadPool

首先来回顾下Java提供的默认线程池的工作流程:

  1. 如果线程池线程数量未到corePoolSize,即使有空闲线程,也会立即新建线程来执行任务
  2. 如果线程数量已达corePoolSize,则将任务放进阻塞队列workQueue
  3. 如果workQueue已满,线程池没有空闲线程且线程数量未到maximumPoolSize,则新建线程执行任务
  4. 如果workQueue已满,总线程数又达到了maximumPoolSize,则根据饱和策略handler来处理该任务

看到第二条,如果线程数量已达corePoolSize,但未达到maximumPoolSize,默认线程池还是会将任务放进阻塞队列。

来看这样一个场景,我们需要线程池来处理抵达的网络请求,平时我们不需要太多线程数量,高峰期需要一个大的线程数量,这是一个弹性的需求。同时高峰期我们肯定希望尽快处理网络请求,但按默认线程池处理策略,它在核心线程都在忙的时候会把请求丢到阻塞队列,然后等到阻塞队列满了才增加线程进行处理,这是我们不能接受的。这里出现的就是EagerThreadPool。

EagerThreadPool这个名词出现在阿里的dubbo框架中,但实际上很多容器如Tomcat也使用了相同的策略;它的与默认策略不同的是当核心线程数都忙且线程数量未到达最大线程数的时候,创建新的线程来处理任务而非丢进阻塞队列,这提高了吞吐量。

首先来看ThreadPoolExecutorexecute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//当小于核心线程数的时候创建新线程
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//核心线程数满,尝试入队
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//否则增加非核心线程执行
reject(command);
}

从源码中可以看到,我们需要介入的是workQueue.offer(command)这里的行为,只要这里为false,那么自然就会去创建新的线程。同时,还需要扩展该execute()方法,在线程数量满的时候令任务进入阻塞队列。

Tomcat和Dubbo都是通过继承ThreadPoolExecutorLinkedBlockingQueue重写executeoffer来实行这一行为的,下面来看看核心代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package org.apache.tomcat.util.threads;

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

//通过设置该变量来监控线程池状态
private transient volatile ThreadPoolExecutor parent = null;

...

//入队操作,如果非EagerThreadPool则抛出错误
public boolean force(Runnable o) {
if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
}

@Override
public boolean offer(Runnable o) {
//parent为null,则无需干预线程池行为,使用默认行为即可
if (parent==null) return super.offer(o);

//当线程池线程数满直接放进队列
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);

//getSubmittedCount是Tomcat扩展方法,获取已提交但未完成的任务数量,这里判断说明有空闲线程,那么无需新建线程,丢尽队列等待调度即可
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);

//当前线程数量是否已满,未满则返回false使线程池创建新线程
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;

return super.offer(o);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package org.apache.tomcat.util.threads;

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

//已提交但未完成的任务数量,包括在队列里和已经提交给工作线程但未开始执行的任务,数值>=getActiveCount()
private final AtomicInteger submittedCount = new AtomicInteger(0);

...

public int getSubmittedCount() {
return submittedCount.get();
}

public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
//首先执行ThreadPoolExecutor的execute,这里已经通过TaskQueue实现先新建线程而非入队
super.execute(command);
} catch (RejectedExecutionException rx) {
//判断是否使用TaskQueue,即是否使用的EagerThreadPool
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
//如果尝试入队失败
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
//尝试入队出错处理
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}

}
}

...
}

通过这些修改,就改变了默认策略,改变了线程池工作流程。