2013年3月15日星期五

【转】Executor Framework源码分析 转贴

From Evernote:

【转】Executor Framework源码分析 转贴

Clipped from: http://hi.baidu.com/ethancheer/item/cfdc2eea475ed20e65db0024

【转】Executor Framework源码分析 转贴




Executor接口是整个framework的核心基础,只定义一个void execute(Runnable command)方法。
ExecutorService extends Executor,定义了一些LifeCycle方法。
AbstractExecutorService implements ExecutorService,提供ExecutorService的一个模板实现。

首先来看Executors类的能常用法:
1.调用Executors类中静态的newCachedThreadPool或newFixedThreadPool方法
2.调用submit来提交一个Runnable或Callable对象
3.如果希望能够取消任务或如果提交一个Callable对象,那就保存好返回的Future对象
4.当不想再提交任何任务时调用shutdown。
通用代码如下:
ExecutorService pool = Executors.newCachedThreadPool();
Callable<String> call0 = new FutureTest(0);
Future<String> future0 = pool.submit(call0);
System.out.println(future0.get());


下面重点分析Executors一些方法,首先看newCachedThreadPool():
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
可以看出是传入SynchronousQueue construct出ThreadPoolExecutor。其实也可以自己定义ThreadFactory:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),threadFactory);
}
通常采用默认的ThreadFactory,从上图中知道ThreadFactory中有一个newThread方法用来创建线程。,来看一下Executors中ThreadFactory的实现:
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}//constructor中实例化一些变量以变newThread方法调用

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;//创建出来的线程均非Daemon且优先级为NORM_PRIORITY级别
}
}
从上面的程序也可以知道,通过ThreadFactory创建出来的线程都属于同一个ThreadGroup
下面继续分析ThreadPoolExecutor的constructor:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}
其中各参数意义如下:
  • corePoolSize: 线程池维护线程的最少数量,原始化时线程池中线程的数量
  • maximumPoolSize:线程池维护线程的最大数量
  • keepAliveTime: 线程池中线程所允许的空闲时间,如果超过这个时间还没有接收到任务就销毁
  • unit: 线程池维护线程所允许的空闲时间的单位
  • workQueue: 线程池所使用的缓冲队列
  • handler: 当线程池数目已经达到maximumPoolSize且workQueue已满,又有新的任务时,采用handler定义的策略处理
defaultHandler指定the handler to use when execution is blocked because the thread bounds and queue capacities are reached.
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
默认采用AbortPolicy,也就是当队列中线程池已满而又有新任务达到时,会直接放弃新任务。ThreadPoolExecutor中定义了四种policy:

各种Policy均实现RejectedExecutionHandler interface,void rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法定义策略。下面来看看各种Policy的源码:
AbortPolicy:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
直接抛出异常;
DiscardPolicy:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
什么也不做,直接忽略掉任务;
DiscardOldestPolicy:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
弹出队列中第一个任务,然后执行新加入的任务;
CallerRunsPolicy:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
执行caller的任务。四种Policy可以自行选择,默认采用AbortPolicy

继续看其他方法的实现:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
可以发现这些方法内部全部是construct出ThreadPoolExecutor实例,只是传递的参数不同。
newCachedThreadPool因为采用SynchronousQueue来缓冲任务,所以保证只有队列里上一个任务被线程取出执行后下一个任务才能执行。其适用于execute many short-lived asynchronous tasks。
newFixedThreadPool保证线程池中永远有初始数目的线程,而且其采用LinkedBlockingQueue作为任务缓冲队列,保证任务能按FIFO的次序执行。

下面重点分析ThreadPoolExecutor的execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
} else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
首先判断command,然后判断poolSize,其定义如下:
/**
Current pool size, updated only while holding mainLock but volatile to
* allow concurrent readability even during updates.
*/
private volatile int poolSize;
从其注释知代表当前线程池中线程数目,接着执行addIfUnderCorePoolSize:
/**
* Creates and starts a new thread running firstTask as its first task, only
* if fewer than corePoolSize threads are running and the pool is not shut
* down.
*
* @param firstTask
*            the task the new thread should run first (or null if none)
* @return true if successful
*/
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
//当前线程池中线程数目小于最小数目,则向池中增加线程,运行并返回
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
addThread方法如下:
/**
* Creates and returns a new thread running firstTask as its first task.
* Call only while holding mainLock.

* @param firstTask
*            the task the new thread should run first (or null if none)
* @return the new thread, or null if threadFactory fails to create thread
*/
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
Worker类主要是线程池的基本元素,其功能是将Runnable变量包装,其定义的变量如下:
/**
* The runLock is acquired and released surrounding each task execution.
* It mainly protects against interrupts that are intended to cancel the
* worker thread from instead interrupting the task being run.
*/
private final ReentrantLock runLock = new ReentrantLock();

/**
* Initial task to run before entering run loop. Possibly null.
*/
private Runnable firstTask;

/**
* Per thread completed task counter; accumulated into
* completedTaskCount upon termination.
*/
volatile long completedTasks;

/**
* Thread this worker is running in. Acts as a final field, but cannot
* be set until thread is created.
*/
Thread thread;
workers定义如下:
/**
* Set containing all worker threads in pool. Accessed only when holding
* mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();

在addIfUnderCorePoolSize方法中启动Worker会执行其run方法:
/**
* Main run loop
*/
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) { //如果取不到task就中断循环
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
可以看出worker会一直不停的尝试取任务,然后执行,先来看看线程池是如何进行任务分配的,getTask代码
/**
Gets the next task for a worker thread to run. The general approach is
* similar to execute() in that worker threads trying to get a task to run
* do so on the basis of prevailing state accessed outside of locks. This
* may cause them to choose the "wrong" action, such as trying to exit
* because no tasks appear to be available, or entering a take when the pool
* is in the process of being shut down. These potential problems are
* countered by (1) rechecking pool state (in workerCanExit) before giving
* up, and (2) interrupting other workers upon shutdown, so they can recheck
* state. All other user-based state changes (to allowCoreThreadTimeOut etc)
* are OK even when performed asynchronously wrt getTask.

* @return the task
*/
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;//此时为shutdownNow状态,不分配任务
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();//此时为shutdown状态,需要把队列中的任务执行完毕
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
//池中线程数目大于最小数目此时任务较多,poll方法能更快取出队列元素
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
//此时池中线程数目较少,任务也较少,可以调用阻塞方法慢慢取任务
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
继续看runTask方法:
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* Ensure that unless pool is stopping, this thread does not
* have its interrupt set. This requires a double-check of state
* in case the interrupt was cleared concurrently with a
* shutdownNow -- if so, the interrupt is re-enabled.
*/
if (runState < STOP && Thread.interrupted() && runState >= STOP)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute is called
* only if task completed or threw exception. Otherwise, the
* caught runtime exception will have been thrown by
* afterExecute itself, in which case we don't want to call it
* again.
*/
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
beforeExecute和afterExecute方法在ThreadPoolExecutor中没有具体实现,留给想实现自定义线程池的开发者自己实现,用于在线程执行任务之前和之后做一些其他的譬如清理等工作。
接着看ensureQueuedTaskHandled方法:
/**
Rechecks state after queuing a task. Called from execute when pool state
* has been observed to change after queuing a task
. If the task was queued
* concurrently with a call to shutdownNow, and is still present in the
* queue, this task must be removed and rejected to preserve shutdownNow
* guarantees. Otherwise, this method ensures (unless addThread fails) that
* there is at least one live thread to handle this task

* @param command
*            the task
*/
private void ensureQueuedTaskHandled(Runnable command) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try {
int state = runState;
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP && poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty())
t = addThread(null);
} finally {
mainLock.unlock();
}
if (reject)
reject(command);//调用handler来处理
else if (t != null)
t.start();
}
参考方法注释,这个方法主要用于当向缓冲队列中添加任务后检测队列,如果有任务提交shutdownNow,而此时队列中还有任务,那会reject掉该任务,否则保证至少有一个线程来执行该任务
最后来看看addIfUnderMaximumPoolSize方法:
/**
* Creates and starts a new thread running firstTask as its first task, only
* if fewer than maximumPoolSize threads are running and pool is not shut
* down.

* @param firstTask
*            the task the new thread should run first (or null if none)
* @return true if successful
*/
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}

最后回顾一下execute方法所进行的操作:
1.检测任务
2.如果线程池中数目小于最小数,调用addIfUnderCorePoolSize方法增加线程,运行并返回
3.如果线程池处在RUNNING状态,并且线程池中数目大于最小数,任务缓冲队列未满:如果线程池处在非RUNNING状态,或者当前线程池中已经没有空闲线程,则执行ensureQueuedTaskHandled;
4.如果已满则执行addIfUnderMaximumPoolSize直到线程池中线程数目达至maximumPoolSize 。当线程数目达到maximumPoolSize 时,调用handler所定义的策略来处理新来的任务。

从上面可以看出线程池中处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

继续看看submit方法,这个方法在AbstractExecutorService中定义:
public <T> Future<T> submit(Callable<T> task) {
if (task == null)
throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);生成一个FutureTask
execute(ftask);
return ftask;
}
将其执行delegate给子类。

再来看一个较少用的方法getTaskCount:
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;//累加池中每个线程已经完成的任务数
if (w.isActive())//如果线程是活跃的,表明其正在执行任务
++n;
}

return n + workQueue.size();//累加上缓冲队列里的任务
} finally {
mainLock.unlock();
}
}

继续看另外一个重要的方法shutdownNow:
public List<Runnable> shutdownNow() {
/*
* shutdownNow differs from shutdown only in that 1. runState is set to
* STOP, 2. all worker threads are interrupted, not just the idle ones,
* and 3. the queue is drained and returned.
*/
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);

final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //锁定
try {
if (security != null) { // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
}

int state = runState;
if (state < STOP)
runState = STOP; //设置状态为STOP

try {
for (Worker w : workers) {
w.interruptNow();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}

List<Runnable> tasks = drainQueue();
tryTerminate(); // Terminate now if pool and queue empty
return tasks;
} finally {
mainLock.unlock();
}
}
可能看出,其主要是执行遍历workers,执行interruptNow方法:
/**
* Interrupts thread even if running a task.
*/
void interruptNow() {
thread.interrupt();//不管线程是否在执行任务,直接中断
}
然后调用drainQueue,返回workQueue中未执行的任务。

对比shutdown方法,其主要区别在于:
if (state < SHUTDOWN)
runState = SHUTDOWN;

try {
for (Worker w : workers) {
w.interruptIfIdle();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}

tryTerminate();
遍历workers时,执行interruptIfIdle方法
/**
* Interrupts thread if not running a task.
*/
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
shutdown和shutdownNow方法都是负责interrupt线程,主要逻辑如下:
1.设置runState并interrupt
2.getTask方法检测runState,如果为shutdownNow状态,返回null,如果take方法阻塞抛出InterruptedException,那么生吞这个异常并在下一轮中继续检测runState。
3.Worker的run方法中断循环,执行workDone方法。

最后需要注意的是:newCachedThreadPool允许线程池中线程最大数目为Inter.MAX_VALUE,事实上这是不现实的,JDK中默认一个线程1M内存。这点需要在使用中注意。

没有评论: