sequenceDiagram
Client-)Future: [Async] Produce Future
Future-)Client: [Async] return Future
Client->>+Future: [Sync] get Value
Future->>-Client: [Sync] return Value Or Except
Callable转 Future的源码追踪
首先我们从任务提交开始,在AbstractExecutorService中的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
public <T> Future<T> submit(Callable<T> task) { if (task == null) thrownewNullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { returnnewFutureTask<T>(runnable, value); } publicFutureTask(Callable<V> callable) { if (callable == null) thrownewNullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
flowchart TD
A[NEW 初始状态] --> B{操作类型}
B --> C[调用 run 方法]
B --> D[调用 cancel false]
B --> E[调用 cancel true]
C --> F[COMPLETING 正在完成]
D --> G[CANCELLED 已取消]
E --> H[INTERRUPTING 正在中断]
F --> I{执行结果}
I --> J[NORMAL 正常完成]
I --> K[EXCEPTIONAL 异常完成]
H --> L[INTERRUPTED 已中断]
J --> M[最终状态]
K --> M
G --> M
L --> M
publicvoidrun() { // 状态不为new 或者 执行线程不为空,结束 run // CAS 更新 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) // call 执行成功,设置结果 set(result); } } finally { // runner must be non-null until state is settled to prevent concurrent calls to run() // CAS的条件 runner = null; // state must be re-read after nulling runner to prevent leaked interrupts ints= state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
public V get()throws InterruptedException, ExecutionException { ints= state; // 未完成,阻塞等待 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) thrownewNullPointerException(); ints= state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) thrownewTimeoutException(); return report(s); } private V report(int s)throws ExecutionException { Objectx= outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) thrownewCancellationException(); thrownewExecutionException((Throwable)x); }