多线程与并发系列之CompletionService

勤奋不是嘴上说说而已,而是实际的行动,在勤奋的苦度中持之以恒,永不退却。业精于勤,荒于嬉;行成于思,毁于随。在人生的仕途上,我们毫不迟疑地选择勤奋,她是几乎于世界上一切成就的催产婆。只要我们拥着勤奋去思考,拥着勤奋的手去耕耘,用抱勤奋的心去对待工作,浪迹红尘而坚韧不拔,那么,我们的生命就会绽放火花,让人生的时光更加的闪亮而精彩。

导读:本篇文章讲解 多线程与并发系列之CompletionService,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

概述

想要知道线程的执行结果,除了FutureTask和CompletableFuture之外,CompletionService是另一个不错的选择。

CompletionService是一个Java8新增的泛型接口,其实现类ExecutorCompletionService,用于主线程提交多个任务后,任务完成即处理结果,并按照任务完成顺序逐个处理。这个类是为线程池中Task的执行结果服务的,即为Executor中Task返回Future而服务的。

CompletionService以异步的方式一边生产新的任务,一边处理已完成的任务的结果。这样可以将执行任务与处理处理分离开来处理。使用submit执行任务,使用take取得已经完成的任务。内部使用Executor框架和BlockingQueue来实现的。

原理

CompletionService源码:

public interface CompletionService<V> {
	// 提交一个Callable类型任务,并返回该任务执行结果关联的Future
	Future<V> submit(Callable<V> task);
	// 提交一个Runnable类型任务,并返回该任务执行结果关联的Future
	Future<V> submit(Runnable task,V result);
	// 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成
	Future<V> take();
	// 从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null,不阻塞
	Future<V> poll();
	// 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null
	Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

实现类ExecutorCompletionService,该类只有三个成员变量,源码:

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    // ExecutorService的扩展,可以获得线程执行结果的
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    // 略
}

类图如下:
在这里插入图片描述
ExecutorCompletionService主要是增强executor线程池的。Task包装后被塞入completionQueue,当Task结束,其Future就可以从completionQueue中获取到。
在这里插入图片描述
ExecutorCompletionService 实现类依赖于 Executor 完成实际的任务提交执行,自己主要负责结果的排队处理,AbstractExecutorService.invokAny 实现就依赖此类,ExecutorCompletionService 内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。

每个提交给 Executor 的任务都是通过继承 FutureTask 封装过的,FutureTask 在任务结束后会回调 done 方法,所以 ExecutorCompletionService 就在继承 FutureTask 封装重写的 done 方法中将当前 FutureTask 加入额外队列,然后通过其 take 或者 poll 方法获取的实质就是从这个额外队列中取数据。

从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加额外的等待时间。而CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束时,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。

先完成的必定先被取出,减少不必要的等待时间。ExecutorCompletionService 类提供此方法的一个实现。

实例

// todo

参考

ExecutorCompletionService

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/142293.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!