Java并发——FutureTask类源码解析

    众所周知,在线程执行的run方法是一个void方法,它是不返回任何结果与状态的,但是FutureTask可以用来在线程执行任务后,返回任务的结果。FutureTask类实现了Future接口的语义,利用Callable接口,即自定义任务来完成这一功能。

    先来看看Future接口的语义:

public interface Future<V> {
    //取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    //是否已经取消
    boolean isCancelled();
    //是否任务完成
    boolean isDone();
    //获取任务执行的结果
    V get() throws InterruptedException, ExecutionException;
    //在一定时间内等待任务执行结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

    Future接口的语义包含着可以取消任务,以及获取任务执行后的结果,为了更好的理解FutureTask类,来看看下面这个例子:

public class Test {
    public static void main (String args[]) throws InterruptedException, ExecutionException {
       FutureTask<String> task=new FutureTask<String>(new Callable<String>() {
           @Override
           public String call() throws Exception {
               Thread.sleep(3000);
               return "蕾姆";
           }
       });
       Thread thread=new Thread(task);
       thread.start();
       System.out.println("三秒后,我接到"+task.get());
    }
}
//输出:
//三秒后,我接到蕾姆

    从代码中可以看出,即便任务执行了三秒多钟,但是当任务执行完成后,会立即返回蕾姆这一结果。需要注意的是,FutureTask类中自定义的回调任务,可能会被多个线程执行,但是FutureTask类中的run方法只会执行一次,也就是说其它线程利用get方法得到的结果是第一个线程执行完毕的结果

    FutureTask类实现了RunnableFuture接口,RunnableFuture接口继承了Future接口以及Runnable接口,借此FutureTask类的任务才能在线程中执行。FutureTask类的任务是由Callable接口语义实现的,Callable即回调,FutureTask类中get方法就是等待run方法(包括调用者自定义的方法)执行完成,才返回结果。接下来深入它的源码,从它的构造方法讲起:

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    需要从外部传入一个Callable接口实现类,也是传入调用者自定义的需要返回结果的方法。其中有几大状态需要进行理解:

    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
  1. NEW状态:表示任务还未开始执行;
  2. COMPLETING状态:表示任务已完成,处于设置结果的状态;
  3. NORMAL状态:表示任务已经完成,结果也已经设置好了,这是正常的最终状态;
  4. EXCEPTIONAL:表示任务执行的时候跑抛出了异常的状态;
  5. CANCELLED:表示任务已被取消的状态;
  6. INTERRUPTING:表示任务运行过程中被中断,设置中断状态的中间状态;
  7. INTERRUPTED:表示任务已被中断,中断状态已被设置好;

    转态转换一般有如下几点:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

    理解了几个状态后,我们来看看FutureTask类的run方法:

public void run() {
        //如果state不是初始的状态
        //或者CAS设置当前线程失败
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            //如果自定义任务不为空且状态是初始化的状态
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //运行调用者自定义的任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //CAS操作设置运行的结果
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

 protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //设置结果
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

    详细的讲解run方法究竟做了什么事,首先。它判断状态是否是初始化或者CAS操作设置当前线程独占是否成功,当上述两者只要有一点不满足的时候,代表着FutureTask类自定义的任务已被其它线程已在执行了,当前线程只需要直接使用另外一个线程执行的结果就可以了,这就是FutureTask类只要被一个线程执行过,那么其它处理该任务的线程会直接使用结果的原因。接着,它判断自定义的任务是否为空且状态是初始的状态,满足就执行自定义的任务,并且在成功后利用set方法设置结果,set方法内部利用CAS原子操作来设置任务的状态,标记为NORMAL状态。再来看看FutureTask类如何等待run方法的执行完成:

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    从源码中看出,它是利用任务的状态来完成的,当状态值小于等于COMPLETING时,代表着任务的结果还未设置完成或者任务还在执行又或者还未开始,这是后就需要等待,调用awaitDone方法进行等待:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
      //......
        for (;;) {
        //如果线程被中断,抛出异常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            //状态大于COMPLETING值,返回状态值
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
          //......
    }

    利用一个循环一直判断任务的状态值,如果大于COMPLETING,那么就返回状态值,因为这时候可能完成了,也有可能中断或者被取消了,接着get方法调用report处理状态值:

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    这时候如果状态值等于NORMAL,那么就正常返回自定义任务的执行结果,否则根据状态值抛出对应的异常。

    总结一下FutureTask类的执行流程,首先线程执行run方法,在run方法里设置当前线程独占,其它线程只需要返回第一个线程计算的结果就行,也在run方法里运行自定义的任务,运行成功就设置任务状态值,调用get方法,如果任务状态值大于COMPLETING,那么直接根据状态值返回结果或者抛出对应的异常,否则就等待状态值的改变。