Java并发编程之CompletionService

导读:本篇文章讲解 Java并发编程之CompletionService,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

简介:CompletionService功能是以异步的方式一边生成新任务,一边处理完成的任务,使得任务的执行与处理进行分离。使用submit提交任务,使用task获取已经完成的任务

先看个示例:

此功能是获取每个任务的执行结果,这里通过future来实现

public class FutureDemo {
	
	private static class CallableTask implements Callable<String> {

		private int time;
		private String name ;
		public CallableTask(int time, String name) {
			this.time = time ;
			this.name = name ;
		}
		
		@Override
		public String call() throws Exception {
			TimeUnit.SECONDS.sleep(this.time) ;
			return name ;
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)) ;
		List<Callable<String>> tasks = new ArrayList<>() ;
		List<Future<String>> results = new ArrayList<>() ;
		for (int i = 1; i <= 5; i++) {
			tasks.add(new CallableTask(i, "name" + i)) ;
		}
		for (int i = 0; i < 5; i++) {
			results.add(pool.submit(tasks.get(i))) ;
		}
		for (int i = 4; i >= 0; i--) {
			System.out.println(results.get(i).get()) ;
		}
	}
	
}

执行结果:

Java并发编程之CompletionService

 

这里提交了5个任务,分别等待1到5秒的时间,在获取任务执行结果的时候,不管哪个任务先执行都只会按照你的获取顺序去获取即便是有其它的任务已经先执行完。future具有阻塞功能。

接下来用CompletionService实现:

public class CompletionServiceDemo {
	
	private static class CallableTask implements Callable<String> {

		private int time;
		private String name ;
		public CallableTask(int time, String name) {
			this.time = time ;
			this.name = name ;
		}
		
		@Override
		public String call() throws Exception {
			TimeUnit.SECONDS.sleep(this.time) ;
			return name ;
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)) ;
		List<Callable<String>> tasks = new ArrayList<>() ;
		CompletionService<String> cs = new ExecutorCompletionService<>(pool) ;
		for (int i = 1; i <= 5; i++) {
			tasks.add(new CallableTask(i, "name" + i)) ;
		}
		for (int i = 0; i < 5; i++) {
			cs.submit(tasks.get(i)) ;
		}
		for (int i = 4; i >= 0; i--) {
			System.out.println("--") ;
			System.out.println(cs.take().get()) ;
		}
	}
	
}

执行结果:

Java并发编程之CompletionService

 

运行结果发现哪个任务先执行完take就会获取那个任务,当没有任务完成的时候take会阻塞。


CompletionService的poll方法非阻塞的,poll方法获取并删除已经完成的Future,如果没有就返回

null。

public class CompletionServiceDemo2 {
	
	private static class CallableTask implements Callable<String> {

		private int time;
		private String name ;
		public CallableTask(int time, String name) {
			this.time = time ;
			this.name = name ;
		}
		
		@Override
		public String call() throws Exception {
			TimeUnit.SECONDS.sleep(this.time) ;
			return name ;
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)) ;
		CompletionService<String> cs = new ExecutorCompletionService<>(pool) ;
		for (int i = 0; i < 5; i++) {
			cs.submit(new CallableTask(i, "name" + i)) ;
		}
		for (int i = 4; i >= 0; i--) {
			System.out.println(cs.poll()) ;
		}
	}
	
}

执行结果:

Java并发编程之CompletionService

 

获取到了第一个完成的任务,其它的都返回了null。

poll返回还可以设置超时时间

public static void main(String[] args) throws Exception {
		ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)) ;
		CompletionService<String> cs = new ExecutorCompletionService<>(pool) ;
		for (int i = 0; i < 5; i++) {
			cs.submit(new CallableTask(i, "name" + i)) ;
		}
		for (int i = 4; i >= 0; i--) {
			System.out.println(cs.poll(5, TimeUnit.SECONDS)) ;
		}
	}

这里超时设置了5秒钟,如果在5秒内获取到了数据那就继续向下执行,如果超过了5秒也会继续向下执行。


CompletionService异常处理

示例:

public class CompletionServiceDemo3 {
	
	private static class CallableTask implements Callable<String> {

		private int time;
		private String name ;
		public CallableTask(int time, String name) {
			this.time = time ;
			this.name = name ;
		}
		
		@Override
		public String call() throws Exception {
			if (this.time == 2) {
				throw new RuntimeException("发生了异常:" + this.name) ;
			}
			TimeUnit.SECONDS.sleep(this.time) ;
			return name ;
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)) ;
		CompletionService<String> cs = new ExecutorCompletionService<>(pool) ;
		for (int i = 0; i < 5; i++) {
			cs.submit(new CallableTask(i, "name" + i)) ;
		}
		for (int i = 4; i >= 0; i--) {
			System.out.println(cs.take()) ;
		}
	}
	
}

这里设置了当time为2时抛出异常,执行结果:

Java并发编程之CompletionService

 

在这里获取结果的时候只是调用了take,并没有调用get方法获取值信息,所以这里跟没有异常一样没有反应出来。

获取返回值时:

public static void main(String[] args) throws Exception {
		ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)) ;
		CompletionService<String> cs = new ExecutorCompletionService<>(pool) ;
		for (int i = 0; i < 5; i++) {
			cs.submit(new CallableTask(i, "name" + i)) ;
		}
		for (int i = 4; i >= 0; i--) {
			System.out.println(cs.take().get()) ;
		}
	}

执行结果:

Java并发编程之CompletionService

 

当调用了cs.take().get()返回值时异常才抛出。从这也发现当有异常抛出时,其它还任务都将被终止运行。


CompletionService.submit(Runnable, V)指定返回值(当执行成功)

public class CompletionServiceDemo3 {
	
	private static class User {
		public String name ;
	}
	
	private static class Task implements Runnable {
		private User user ;
		public Task(User user) {
			this.user = user ;
		}
		@Override
		public void run() {
			try {
				TimeUnit.SECONDS.sleep(5) ;
			} catch (InterruptedException e) {
				e.printStackTrace();
			}			
			System.out.println("任务执行完成。。。") ;
			this.user.name = "张三" ;
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)) ;
		CompletionService<User> cs = new ExecutorCompletionService<>(pool) ;
		User user = new User() ;
		Future<User> f = cs.submit(new Task(user), user) ;
		System.out.println(f.get().name) ;
	}
	
}

执行结果:

Java并发编程之CompletionService

 

完毕!!!

给个关注+转发呗,谢谢

Java并发编程之CompletionService

Oracle数据同步到Elasticsearch

spring data jpa 高级应用

Spring Cloud Sentinel 流控限流

Spring MVC 异常处理方式

Spring MVC 异步请求方式

Spring Cloud Sentinel 熔断降级

Spring Cloud Sentinel 基础配置

SpringCloud Nacos 整合feign

SpringMVC参数统一验证方法

Springboot Security 基础应用 (1)

SpringBoot多数据源配置详解

springboot mybatis jpa 实现读写分离

Spring Cloud Sentinel 热点参数限流

SpringBoot开发自己的Starter

SpringCloud Hystrix实现资源隔离应用

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/80020.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!