JUC-16-Future

JUC-16-Future

1. Future 接口

  • Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果等操作。

  • 必要时通过get 方法获取执行结果,该方法会阻塞直到任务返回结果。

  • 当一个线程需要等待另外一个线程把某个任务执行完成后它才能继续执行,此时可以似乎用FutureTask

  • Future类位于java.util.concurrent包下,它是一个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/** 
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's <tt>get</tt> method
*/
public interface Future<V> {

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run. If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task. *
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns <tt>true</tt> if this task was cancelled before it completed
* normally.
*/
boolean isCancelled();

/**
* Returns <tt>true</tt> if this task completed.
*
*/
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
*/
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

在Future接口中声明了5个方法,下面依次解释每个方法的作用:

  • cancel(
    • 用来取消任务,如果取消成功返回true,取消失败返回false.
    • 参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
  • isCancelled()
    • 方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • isDone()
    • 方法表示任务是否已经完成,若任务完成,则返回true;
  • get()
    • 用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。
  • get(long timeout, TimeUnit unit)
    • 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

总结:

  • Future 提供以下三种功能
    • 判断任务是否完成。
    • 能够中断任务。
    • 能够获取任务执行的结果。
  • 因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

2. FutureTask 类

概述

  • FutureTask 类的实现基于 AQSJUC 中很多可阻塞的类都是基于AQS实现的。
  • AQS是一个同步框架,它提供通用机制来控制原子性管理同步状态,阻塞和唤醒线程,以及维护被阻塞线程的队列。
    • JDK6 中被广泛使用
    • 基于AQS的实现同步器包括 : (ReetrantLock , Semaphore,CountDownLatch,ReetrantReadWriteLock,FutureTask)
  • 首先是介绍下FutureTask类中的变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//表示当前task的状态
private volatile int state;
//表示当前task尚未执行
//场景:1、任务刚新建还未入队列
// 2、任务刚新建已入队列,还未被线程执行
// 3、任务刚新建已入队列,正在被线程执行run() 注意:当执行任务有结果后才会修改当前任务状态
private static final int NEW = 0;
//表示当前task正在结束,但是还未完全结束的一种临界状态
private static final int COMPLETING = 1;
//表示当前task正常结束
private static final int NORMAL = 2;
//表示当前task执行过程中发生异常,内部封装的callable.run() 向上抛出了异常
private static final int EXCEPTIONAL = 3;
//表示当前task被取消
private static final int CANCELLED = 4;
//表示当前task正在中断中
private static final int INTERRUPTING = 5;
//表示当前task已中断
private static final int INTERRUPTED = 6;

//submit(runnable/callable) runnable 使用装饰者模式装饰为 callable
private Callable<V> callable;
//正常情况下:任务执行结束,outcome保存执行结果。 即callable的返回值
//异常情况下:callable向上抛出异常信息,outcome保存异常信息
private Object outcome;
//当前任务被执行时,保存执行当前任务的当前线程的对象引用
private volatile Thread runner;
//因为会有很多线程去get当前任务的结果。所以,这里使用 链表 这种数据结构
private volatile WaitNode waiters;

static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
//thread变量 保存线程对象引用
//next变量 保存当前节点的下一个节点
WaitNode() { thread = Thread.currentThread(); }
}
  • 构造器(2个构造器)
1
2
3
4
5
6
7
8
9
10
11
public FutureTask(Callable<V> callable) {  
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

可以看到,Runnable注入会被Executors.callable()函数转换为Callable类型,即FutureTask最终都是执行Callable类型的任务。

这里使用到了适配器模式:将Runnable 接口 的任务 转换成 Callable接口的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static <T> Callable<T> callable(Runnable task, T result) {  
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
  • 执行一个task的步骤是:
    • submit(runnable/callable) -> 调用 sumbit()提交我们自定义的工作单元
    • newTaskFor(runnable/callable) -> 调用newTaskFor将runnable/callable 封装成FutureTask
    • execute(task) -> 执行execute()将传入的task放到线程池中
    • threadpool -> 当线程池中有空闲线程,就会执行任务的run()。否则,就会进入任务队列等待被执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//submit(runnable/callable) -> newTaskFor(runnable/callable) -> execute(task) -> threadpool
//当线程池中没有空闲线程来执行当前任务,则会先进入任务队列中,等待空闲线程调用;如果有就直接执行。
//任务执行入口
public void run() {
//条件1:true -> 当前任务的状态不是新建状态。(可能已经被执行或者取消)
//条件2:true -> cas失败,表示当前有其它线程抢占了这个任务
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;

//当前线程开始执行当前任务
//前置条件:1、当前任务的状态是新建状态 2、当前线程抢占到当前任务的执行权
try {
//表示外部传入的自定义的业务程序。 是callable或者是用runnable装饰后的callable
Callable<V> c = callable;
//条件1:防止空指针异常
//条件2:防止有外部线程将任务cancel了
if (c != null && state == NEW) {
//要返回的结果
V result;
//true -> callable程序执行成功,未抛出异常
//false -> callable程序执行失败,抛出异常
boolean ran;
try {
//执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//说明当前任务正常结束
//设置result给outcome
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
//只会在当前任务被cancel时执行
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void setException(Throwable t) {
//使用cas把任务当前状态设置成完成中
//执行失败的情况:外部线程在当前线程执行set之前把当前任务cancel(很小概率事件)
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//将callable
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//唤醒waiters中的所有线程
finishCompletion();
}
}
protected void set(V v) {
//使用cas把任务当前状态设置成完成中
//执行失败的情况:外部线程在当前线程执行set之前把当前任务cancel(很小概率事件)
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
//将结果赋值给outcome后,将当前任务的状态直接修改为NORMAL(正常完成)状态
//putOrderedInt(Object obj, long offset, int value) obj:包含要修改field的对象 offset:obj中整型field的偏移量 value:field将被设置的新值
//设置obj对象中offset偏移地址对应的整型field的值为指定值。
//这是一个有序或者有延迟的putIntVolatile方法,并且不保证值的改变被其他线程立即看到。
//只有在field被 volatile 修饰并且期望被意外修改的时候使用才有用。
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//唤醒waiters中的所有线程
finishCompletion();
}
}
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
//释放cpu资源
Thread.yield(); // wait out pending interrupt
}
  • get () 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//场景:多个线程等待当前任务执行完成后的结果
public V get() throws InterruptedException, ExecutionException {
//获取当前任务状态
int s = state;
//true -> 任务状态可能为 未执行、正在执行、完成中
//当任务状态为以上情况时,调用get()的外部线程会被阻塞
if (s <= COMPLETING)
//返回线程当前状态
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//System.nanoTime() 返回的是纳秒,nanoTime返回的可能是任意时间,甚至可能是负数
//timed true -> 表示带超时时间,deadline = System.nanoTime()
// false -> 表示不带超时时间,deadline = 0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//引用当前线程封装成WaitNode对象
WaitNode q = null;
//表示是否入队
boolean queued = false;
//自旋
for (;;) {
//ture -> 说明当前线程是被其它线程中断而唤醒的
//interrupted() 返回true后 会将Thread的中断标记重置为false
if (Thread.interrupted()) {
//当前线程node出队
removeWaiter(q);
//向get()抛出中断异常
throw new InterruptedException();
}

//当前线程被其它线程正常唤醒(使用了unpark(thread))的场景,继续执行自旋逻辑
//获取当前任务最新状态
int s = state;
//true -> 表示当前任务已经有结果了
if (s > COMPLETING) {
//true -> 说明已经为当前线程创建node节点,需要执行 node.thread = null (helpGC)
if (q != null)
q.thread = null;
//返回任务状态
return s;
}
//true -> 表示当前任务正在完成中
else if (s == COMPLETING) // cannot time out yet
//将当前cpu的资源释放,进行下次抢占cpu资源
//注意:yield()会释放CPU资源,但是是与其它线程一起重新抢占资源。当前线程可能还会获取到执行权,也有可能被其他线程获取到
Thread.yield();

//场景:第一次自旋
//true -> 当前线程还未创建WaitNode对象
else if (q == null)
//为当前线程创建WaitNode对象,并把当前线程赋值到thread全局变量中
q = new WaitNode();
//场景:第二次自旋
//true -> 当前线程已经创建WaitNode对象,但是还未入队
else if (!queued){
//把当前线程node节点的next指针 指向到原队列的头结点
//waiters 表示队列头指针
q.next = waiters;
//用cas方式设置waiters指针指向当前线程的node节点(头插法)
//true -> 入队成功
//false -> 有其它线程抢先入队,进入下次自旋
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, waiters, q);
}
//场景:第三次自旋
//判断是否有带入超时时间
else if (timed) {//有超时时间的场景
nanos = deadline - System.nanoTime();
//true -> 表示等待超时了
if (nanos <= 0L) {
//当前线程node出队
removeWaiter(q);
//无论完成与否,直接返回当前状态
return state;
}
//当前线程进入有时间限制的休眠状态,有其它线程将其 唤醒 或者 中断 或者 时间到了 就会重新激活
LockSupport.parkNanos(this, nanos);
}
else//无超时时间的场景
//park() 会将当前线程状态变为waitting(休眠)
//当前线程进入休眠状态, 有其它线程将其 唤醒 或者 中断 就会重新激活
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
//正常情况下:outcome保存的是callable运行出来的结果
//非正常情况下:outcome保存的是callable运行时抛出的异常信息
Object x = outcome;
//true -> 表示当前任务正常完成
if (s == NORMAL)
//直接返回callable运行结果
return (V)x;
//true -> 表示当前任务被取消
if (s >= CANCELLED)
//抛出取消中断异常
throw new CancellationException();
//抛出自定义的callable程序执行时产生的异常
throw new ExecutionException((Throwable)x);
}

注意 :

  1. get任务执行结果的线程并不只有一个,而是有很多个在同一链表中的线程竞争获取。
  2. 当任务还未执行完毕,线程就会进行休眠,等待其它线程唤醒(可能是正常唤醒,也可能是任务中断)
  • cancel()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//mayInterruptIfRunning 设成false话,不允许在线程运行时中断,设成true的话就允许。
public boolean cancel(boolean mayInterruptIfRunning) {
//state == NEW:true -> 表示当前任务处于运行中 或者 处于线程池队列中
//UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
// true -> 表示cas成功,将当前任务状态修改为 中断中 或者 取消
//注意,这里的if取反,上述条件都为true才会执行下面的逻辑。否则直接返回false
if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;

try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
//获取执行当前task的线程
Thread t = runner;
//可能会为null,因为可能当前task还在任务队列中排队
if (t != null)
//如果不为null,则给runner线程一个中断信号
//只是会给任务一个中断标志,能否中断要看task中是否有响应中断的程序
t.interrupt();
} finally { // final state
//执行完中断后将任务的状态改为已中断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//唤醒waiters中的所有线程
finishCompletion();
}
return true;
}
  • 最后要讲下finishCompletion()这个方法,看了上面的源码可以发现,有3个地方都执行了这个方法,分别是cancel()、set()、setException()。
    其作用就是当任务执行有结果了(无论好坏)都会唤醒waiters链表中的 所有 线程继续执行自旋逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//唤醒所有waiters中的线程
private void finishCompletion() {
// assert state > COMPLETING;
//将waiters头结点赋值给q
for (WaitNode q; (q = waiters) != null;) {
//用cas将头结点置为null,防止该任务被其它线程取消 且 help gc
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {//自旋
//获取当前节点的thread对象
Thread t = q.thread;
//true -> thread对象存在防止空指针
if (t != null) {
//清空当前节点引用的thread对象 help gc
q.thread = null;
//唤醒该线程
LockSupport.unpark(t);
}
//获取当前节点的下一个节点
WaitNode next = q.next;
//true -> 已经处于队列末尾
if (next == null)
//退出自旋
break;
//清空当前节点的next指针指向的对象引用 help gc
q.next = null; // unlink to help gc
//当前节点指针指向下一个节点
q = next;
}
//所有waiters队列中的线程唤醒完成
break;
}
}
//自定义扩展的操作
done();
//释放资源 help gc
callable = null; // to reduce footprint
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信