上篇博客是线程池的源码分析,这篇是关于延时线程池的源码分析(JDK1.8)。
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor类,实现了ScheduledExecutorService接口。ScheduledThreadPoolExecutor 支持延后执行给定task,或是定期执行。任务在到时间后执行,但是没有任何的实时保证。对于计划执行时间相同的任务,会按照提交顺序,先进先出执行。
1
| public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}
|
ScheduledThreadPoolExecutor
1 2 3 4 5 6 7 8 9 10 11 12
| public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler); }
|
从构造器可以看出最大线程数为Integer.MAX_VALUE,任务队列是DelayedWorkQueue,也是定时线程池实现定时功能的核心(ScheduledThreadPoolExecutor继承自ThreadPoolExecutor)。
添加延时任务
ScheduledThreadPoolExecutor实现的接口ScheduledExecutorService,除了继承ExecutorService的接口外,还额外实现了下面几个接口:
1 2 3 4 5 6 7 8 9 10 11 12
| public interface ScheduledExecutorService extends ExecutorService { ... public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit); }
|
下面是ScheduledThreadPoolExecutor 对ExecutorService 的submit方法的实现,结合ScheduledExecutorService的扩展方法。我们可以知道延时任务的创建是通过ScheduledExecutorService中的四个方法实现的。
1 2 3 4 5 6 7 8 9 10 11
| public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); }
public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, NANOSECONDS); }
public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); }
|
我们来依次分析下这四个方法的源码,这四个方法可以分为one-shot(一次)和period(周期)两类:
执行一次的延时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit))); delayedExecute(t); return t; }
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable,triggerTime(delay, unit))); delayedExecute(t); return t; }
|
周期任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
|
delayedExecute
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
|
DelayedWorkQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; private Thread leader = null; private final Condition available = lock.newCondition(); ... public void put(Runnable e) {offer(e);} public boolean add(Runnable e) {return offer(e);} public boolean offer(Runnable e, long timeout, TimeUnit unit) {return offer(e);} ... }
|
添加任务
通过上面的代码分析,向queue中添加task是调用了add方法,add和put方法也是调用offer,DelayedWorkQueue的offer被重写了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; } private void grow() { int oldCapacity = queue.length; int newCapacity = oldCapacity + (oldCapacity >> 1); if (newCapacity < 0) newCapacity = Integer.MAX_VALUE; queue = Arrays.copyOf(queue, newCapacity); } private void setIndex(RunnableScheduledFuture<?> f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask)f).heapIndex = idx; }
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
|
下面我们来看下,如何从DelayedWorkQueue中获取暂存的task.
poll
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| public RunnableScheduledFuture<?> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first = queue[0]; if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } } public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; first = null; if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; }
private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); }
|
take
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public RunnableScheduledFuture take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return finishPoll(first); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
|
take 过程中leader thread的说明
假设,有 T1, T2, T3 三个 worker 线程,在竞争获取 queue[0] 任务。
- 当 T1 成功执行到 take 的循环中的最后一个 else 中的 available.awaitNanos(delay); 线程时, leader 就会被设置成 T1。
- 此时处于 wait 过程的线程 T1, 将放弃锁 lock, 所以 T2, T3 又开始竞争锁
- 假设 T2 获得锁,进行循环执行到 else if (leader != null) ,显然 leader 已经是 T1 了,所以这个判断成立,所以 T2 会进入 available.await();
- 此时 T1 因为 available.awaitNanos(delay) delay 时间已过,所以被唤醒,重新获取锁之后,将 leader 置为 null, 在 T1 返回之前,执行最后的 finally 语句,将通知 像 T2 这样的线程(处于 available.await()),使得 T2 可以立即参与到下一次获取 queue[0] 的过程中。
- 此时,T1 获取了任务,正在执行,T2 被 T1 唤醒,T2 和 T3 又开始了第一个步骤中的过程一样进行任务获取了。
通过这三个 worker 线程获取任务的过程可知:leader 的使用,使得 worker 线程的 wait 最小化,尽量使得 worker 能够参与到任务的获取和执行中来。如果不使用 leader, 则 T1, T2, T3 线程最终都进入 available.awaitNanos(delay); 的过程,而对于一个任务来说,多个线程在其上等待是没有意义的,因为最终只需要一个线程来执行任务,而不是所有在其上 wait 的线程,所以 leader 其实也可以认为是当前queue[0]最终可以被获成功的那个线程。显然应该只有一个
ScheduledFutureTask
classDiagram
ScheduledFutureTask --|>FutureTask
ScheduledFutureTask ..|> RunnableScheduledFuture
FutureTask ..|> RunnableFuture
RunnableScheduledFuture..|> RunnableFuture
RunnableScheduledFuture..|> ScheduledFuture
ScheduledFuture..|> Future
ScheduledFuture..|> Delayed
RunnableFuture..|> Future
RunnableFuture..|> Runnable
<<interface>> RunnableScheduledFuture
<<interface>> RunnableFuture
<<interface>> ScheduledFuture
<<interface>> Future
<<interface>> Delayed
<<interface>> RunnableScheduledFutureTask 实现的接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
public interface ScheduledFuture<V> extends Delayed, Future<V> {}
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> { boolean isPeriodic(); }
|
ScheduledFutureTask 源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
| private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber;
private long time;
private final long period;
RunnableScheduledFuture<V> outerTask = this;
int heapIndex; ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
public int compareTo(Delayed other) { if (other == this) return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
public boolean isPeriodic() { return period != 0; } private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); if (cancelled && removeOnCancel && heapIndex >= 0) remove(this); return cancelled; }
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } } }
|