扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
本篇内容介绍了“如何结合线程池理解FutureTask及Future源码”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
创新互联是专业的宕昌网站建设公司,宕昌接单;提供成都做网站、成都网站设计,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行宕昌网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
多线程Runnable和Callable接口这里就不多说了,Callable有返回值,Runnable无返回值。
public class FutureTaskTest { public static void main(String[] args) { ExecutorService executor = null; try { //线程池提交Runnable接口任务 executor.execute(new MyRunnable()); //线程池提交Callable接口任务 executor = Executors.newFixedThreadPool(2); Future f = executor.submit(new MyCallLable()); System.out.println(f.get()); //单线程方式 FutureTask ft = new FutureTask(new MyCallLable ()); Thread t = new Thread(ft); t.start(); System.out.println(ft.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { if (executor != null) { executor.shutdown(); } } } static class MyCallLable implements Callable { @Override public Object call() throws Exception { return 1; } } static class MyRunnable implements Runnable { @Override public void run() { System.out.println(2); } }}
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}
该方法创建了一个核心线程和最大线程数一样的线程池,使用LinkedBlockingQueue这种无界队列存储多余的任务,也就是说,如果我们使用这种jdk自带的线程提交任务的时候,由于队列是无界的,当任务达到一定数量会造成内存溢出。这里不再分析ThreadPoolExecutor代码,有兴趣的可以看我的另一篇博文专门分析ThreadPoolExecutor源码的。该方法返回一个ExecutorService。
ThreadPoolExecutor继承体系如下图:
该方法实际调用的是实现类AbstractExecutorService.submit方法
publicFuture submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask;}
这里的newTaskFor方法就会将Callable任务传递到FutureTask类中,并封装到其Callable属性中
protectedRunnableFuture newTaskFor(Callable callable) { return new FutureTask (callable);}
/* 线程状态可能的转换: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED *///当前任务状态private volatile int state;//新创建private static final int NEW = 0;//即将结束,但还没有结束private static final int COMPLETING = 1;//正常结束private static final int NORMAL = 2;//异常状态:Callable接口的Call方法中具体业务逻辑出现异常private static final int EXCEPTIONAL = 3;//任务被取消private static final int CANCELLED = 4;//任务处于中断中private static final int INTERRUPTING = 5;//任务被中断private static final int INTERRUPTED = 6;//任务提交传入的Callable,用来调用call方法private Callablecallable;//Call方法返回值//1.如果任务正常结束,返回call方法的返回值//2.如果call方法发生异常,返回具体的异常信息private Object outcome;//当前执行的线程private volatile Thread runner;//一个栈结构的数据类型,存储被get方法阻塞的线程的引用private volatile WaitNode waiters;
public FutureTask(Callablecallable) { //外部需要传入Callable接口的实现 if (callable == null) throw new NullPointerException(); this.callable = callable; //将线程状态设置为先创建 this.state = NEW;}
从示例的线程池提交Calllable接口的案例中一步步分析:1.executor.submit(new MyCallLable())方法提交一个Callable实现;2.第一步实际会调用AbstractExecutorService.submit方法;3.AbstractExecutorService.submit内部调用newTaskFor方法生成一个FutureTask对象,并将MyCallLable任务封装到其Calllable属性中;4.AbstractExecutorService.submit方法内部调用ThreadPoolExecutor.execute方法提交FutureTask对象到线程池;5-6-7-8.实际就是线程池提交一个任务的执行过程,具体源码可以看我的另一篇博客,这里比较复杂,概况的说了下;9-10.线程池execute实际会执行FutureTask的run方法,在run方法中调用Callable.call,这就是线程池提交Callable执行的流程;
public void run() { //条件1:当前任务状态不是新建状态 //条件2:当前线程不是FutureTask持有的线程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) //退出执行 return; try { //当前FutureTask持有的callable Callablec = callable; //条件1:当前提交的Callable不能为空 //条件2:当前线程任务状态为新创建 if (c != null && state == NEW) { //Callable的返回值 V result; //任务是否成功执行 boolean ran; try { //调用用户自定义call方法的逻辑 result = c.call(); //任务成功执行 ran = true; } catch (Throwable ex) { //发生异常 result = null; ran = false; setException(ex); } //任务成功执行设置返回值 if (ran) set(result); } } finally { //run方法结束持有线程设置为空,help gc //这里可能正常执行完run方法也可能出现异常退出 runner = null; //当前任务执行状态 int s = state; //如果处于中断的状态,包含中断中和已中断,释放cpu资源 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
该方法设置任务成功执行后的执行结果状态和返回值,将返回值封装到outcome属性中,由于get方法是阻塞的,还需要唤醒阻塞的线程。
protected void set(V v) { //将状态从新建设置为结束中 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //返回值赋值 outcome = v; //设置任务状态为正常结束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //唤醒被get方法阻塞的线程 finishCompletion(); }}
在分析finishCompletion方法前,先介绍下WaitNode类。为什么会有这个类?我们知道FutureTask.get方法是阻塞的,如果我们在一个线程内多次调用get方法,这个从理论上考虑其实不需要WaitNode的;如果我们又多次创建了线程在其他线程内部调用get方法呢?由于FutureTask.get方法内部会调用LockSupport.park(Thread)或LockSupport.parkNanos阻塞线程,所以就需要唤醒;而LockSupport.unpark(Thread)解除线程阻塞也需要指定线程,所以这里就需要一种数据结构来存储当前线程的引用了。这里就设计了WaitNode这个类,它是一个单链表,而且采用的是头插法,在遍历的时候也是从前往后遍历的,这就是一个典型的栈的结构,先进后出,后进先出。这里为什么又是一个单链表结构呢?这是为了方便在任务结束的时候遍历。
static final class WaitNode { //当前线程的引用 volatile Thread thread; //指向下一个节点 volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); }}
用于唤醒被get方法阻塞的线程
private void finishCompletion() { // assert state > COMPLETING; //从头开始遍历 for (WaitNode q; (q = waiters) != null;) { //使用cas方式设置当前waiters为空,防止外部线程调用cancel导致finishCompletion该方法被调用 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { //获取当前WaitNode对应的线程 Thread t = q.thread; if (t != null) { q.thread = null; //help gc //唤醒当前节点对应的线程 LockSupport.unpark(t); } //获取当前节点的下一个节点 WaitNode next = q.next; if (next == null) break; q.next = null;//help gc //将q指向下要给节点 q = next; } break; } } done(); //将callable置为空,help gc callable = null; }
该方法将返回值设置为抛出的异常,将任务状态设置为EXCEPTIONAL状态,并调用finishCompletion方法唤醒被get阻塞的线程。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); }}
3.5.9.FutureTask.handlePossibleCancellationInterrupt方法分析
private void handlePossibleCancellationInterrupt(int s) { //如果任务状态处于中断中,释放cpu资源 if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt}
两个方法区别不大,唯一的区别是阻塞线程的时候使用的LockSupport.parkNanos(this, nanos)和LockSupport.park(this),当有时间条件的时候LockSupport.parkNanos(this, nanos)会在指定时间内结束后自动唤醒线程。
这里讲讲sleep和LockSupport.parkNanos区别:sleep在指定时间到期后会判断中断状态,根据中断状态来判断是否需要抛出异常,而LockSupport.parkNanos不会根据中断状态做出响应。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s);}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; //unit.toNanos(timeout)将指定时间格式转化为对应的毫微秒 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s);}
t.interrupted()也是可以唤醒被LockSupport.park()阻塞的线程的
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? S ystem.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; //自旋 for (;;) { //条件成立说明当前线程是被其他线程调用t.interrupted()这种中断方式唤醒 if (Thread.interrupted()) { //从队列中移除线程被中断的节点 removeWaiter(q); throw new InterruptedException(); } int s = state; //(4).s>COMPLETING成立,说明当前任务已经执行完,结果可能有好有坏 if (s > COMPLETING) { if (q != null) q.thread = null; //返回当前任务状态 return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); //(1).第一次自旋,q=null,创建当前线程对应的WaitNode对象 else if (q == null) q = new WaitNode(); //(2).第二次自旋,queued为false,q.next = waiters采用头插法将当前节点入栈 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //(3).第三次自旋,会走到这里,将线程阻塞,等待后续唤醒后继续自旋调用,也可能因为超时后自动唤醒 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //从队列中移除get超时的节点 removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }}
每次调用get方法都会将线程封装成WaitNode入栈,当调用get方法的线程由于被中断唤醒或者超时自动唤醒的都需要从队列中移除, 并重新组装栈结构。
一张图概况该方法做的事情:
private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } }}
将返回值封装到outcome属性中返回,可能是正常的值也可能是一个异常信息
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x);}
public boolean cancel(boolean mayInterruptIfRunning) { //条件1:说明当前任务处于运行中 //条件2:任务状态修改 //条件1和条件2成立则执行下面cancel的核心处理逻辑,否则返回false代表取消失败 //可能会有多个线程调用cancel方法导致cancel失败的情况 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception //mayInterruptIfRunning是否中断线程 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) //中断线程 t.interrupt(); } finally { // final state //设置任务为中断状态 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //唤醒所有get阻塞的线程 finishCompletion(); } return true;}
“如何结合线程池理解FutureTask及Future源码”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流