classDiagram
class ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor--|>ThreadPoolExecutor
ScheduledThreadPoolExecutor..|>ScheduledExecutorService
class ThreadPoolExecutor
ThreadPoolExecutor--|>AbstractExecutorService
class ForkJoinPool
ForkJoinPool--|>AbstractExecutorService
class AbstractExecutorService
<<abstract>> AbstractExecutorService
AbstractExecutorService..|>ExecutorService
ScheduledExecutorService..|>ExecutorService
class ScheduledExecutorService
<<interface>> ScheduledExecutorService
class ExecutorService
<<interface>> ExecutorService
ExecutorService..|>Executor
class Executor
<<interface>> Executor
publicvoidexecute(Runnable command) { if (command == null) thrownewNullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ intc= ctl.get(); // step 1 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // step 2 if (isRunning(c) && workQueue.offer(command)) { // 再次获取ctl的值recheck intrecheck= ctl.get(); // 如果当前线程池的状态不是RUNNING,并且从队列workQueue移除command成功的话, // 调用reject()方法拒绝任务command if (! isRunning(recheck) && remove(command)) reject(command); elseif (workerCountOf(recheck) == 0) //如果当前工作线程woker数目为0,尝试添加新的worker线程,但是不携带任务 addWorker(null, false); // 注意此处task为null } // step 3 elseif (!addWorker(command, false)) reject(command); }
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse;
//使用CAS机制尝试将当前线程数+1 //如果是核心线程当前线程数必须小于corePoolSize //如果是非核心线程则当前线程数必须小于maximumPoolSize //如果当前线程数小于线程池支持的最大线程数CAPACITY 也会返回失败 for (;;) { intwc= workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
//这里已经成功执行了CAS操作将线程池数量+1,下面创建线程 booleanworkerStarted=false; booleanworkerAdded=false; Workerw=null; try { w = newWorker(firstTask); //Worker内部有一个Thread,并且执行Worker的run方法,因为Worker实现了Runnable finalThreadt= w.thread; if (t != null) { //这里必须同步,在状态为运行的情况下将Worker添加到workers中 finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. intrs= runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable thrownewIllegalThreadStateException(); workers.add(w); //把新建的woker线程放入集合保存,这里使用的是HashSet ints= workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //如果添加成功则运行线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //如果woker启动失败,则进行一些善后工作,比如说修改当前woker数量等等 if (! workerStarted) addWorkerFailed(w); // 内部也使用了 lock } return workerStarted; }
privatefinalclassWorker extendsAbstractQueuedSynchronizer implementsRunnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ privatestaticfinallongserialVersionUID=6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatilelong completedTasks;
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
/** Delegates main run loop to outer runWorker. */ publicvoidrun() { runWorker(this); }
// Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state.
shutdown() :Initiates an orderly shutdown in which previously submitted tasks are executed, but no newtasks will be accepted.Invocation has no additional effect if already shut down.This method does not wait for previously submitted tasks to complete execution.Use {@link #awaitTermination awaitTermination} to do that. shutdownNow():Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution. These tasks are drained(removed) from the task queue upon return from this method.This method does not wait for actively executing tasks to terminate. Use {@link #awaitTermination awaitTermination} to do that.There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. This implementation cancels tasks via {@link Thread#interrupt}, so any task that fails to respond to interrupts may never terminate.