Executor 的线程结果收集

2016-08-17 19:17:46   最后更新: 2016-08-17 19:17:46   访问数量:404




上一篇日志中,我们设计了一个使用 ExecutorService 与 Runnale 接口创建线程的例子

Executor 框架及线程池的使用

此前我们介绍过 Runnable 与 Callable 的区别,Runnable 接口的 run 方法不能抛出异常也不具有返回值,而实际的使用中,我们常常需要在并发过程中获取每个线程的返回值,这时我们就需要 Callable 接口

 

最基本的线程执行结果收集方法是使用 Future 对象:

package com.techlog.test.service; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * Created by techlog on 16/8/16. */ @Service public class TestExecutorService { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TestExecutorService.class); private static final ExecutorService exec = Executors.newFixedThreadPool(20); public void beepForOneMinute() throws ExecutionException, InterruptedException { List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < 20; ++i) { futures.add(exec.submit(() -> { int sleeptime = (int) (Math.random() * 10); LOGGER.info("Thread begin sleep " + sleeptime + " -- " + Thread.currentThread().getId()); Thread.sleep(sleeptime * 1000); return "Thread awake " + sleeptime + " -- " + Thread.currentThread().getId(); })); } for (Future<String> future : futures) { System.out.println(future.get()); } exec.shutdown(); } }

 

 

缺陷和改进

虽然这样的实现非常简单,但是有一个很严重的缺陷,那就是他是按照线程创建顺序来收集的,如果第一个线程执行时间远远超过其他线程的执行时间,那么主线程将一直等待第一个线程返回结果,而迟迟无法收集到其他线程的结果

如果你需要使用 Future 来改进这个问题,你可以在调用 Future 的 get 方法时传递 timeout 参数,如果将 timeout 参数设为 0,则方法会立即返回,这样你可以轮训所有对象收集他的结果

 

ExecutorService 配合 FutureTask 实现线程池的例子在此前 FutureTask 的介绍中我们已经写过一个:

java 线程同步工具类

 

此处我们再来设计一个简单明了有代表性的例子:

package com.techlog.test.service; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.util.concurrent.*; /** * Created by techlog on 16/8/16. */ @Service public class TestExecutorService { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TestExecutorService.class); private static final ExecutorService executorService = Executors.newFixedThreadPool(20); public void beepForOneMinute() throws ExecutionException, InterruptedException { for (int i = 0; i < 20; ++i) { MonitorFutureTask monitorFutureTask = new MonitorFutureTask(() -> { int sleeptime = (int) (Math.random() * 10); LOGGER.info("Thread begin sleep " + sleeptime + " -- " + Thread.currentThread().getId()); Thread.sleep(sleeptime * 1000); return "Thread awake " + sleeptime + " -- " + Thread.currentThread().getId(); }); executorService.submit(monitorFutureTask); } } } class MonitorFutureTask extends FutureTask<String> { public MonitorFutureTask(Callable<String> callable) { super(callable); } @Override protected void done() { try { System.out.println(get()); } catch (Exception e) { e.printStackTrace(); } } }

 

 

我们复写了 FutureTask 的 done 方法,一旦有任务执行结束就会立即调用它来进行相应的处理,通过 get 方法即可获取线程的执行结果

 

CompletionService 是通过 BlockingQueue 实现的,当任务完成,他会自动将任务结果封装为一个 Future 对象并放入到队列中,直到调用 take 方法取出队列中的 Future 对象

CompletionService 可以保证线程执行结束后的结果被立即收集

 

package com.techlog.test.service; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.util.concurrent.*; /** * Created by techlog on 16/8/16. */ @Service public class TestExecutorService { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TestExecutorService.class); private static final ExecutorService exec = Executors.newFixedThreadPool(20); public void beepForOneMinute() throws ExecutionException, InterruptedException { CompletionService<String> execcomp = new ExecutorCompletionService<>(exec); for (int i = 0; i < 20; ++i) { execcomp.submit(() -> { int sleeptime = (int) (Math.random() * 10); LOGGER.info("Thread begin sleep " + sleeptime + " -- " + Thread.currentThread().getId()); Thread.sleep(sleeptime * 1000); return "Thread awake " + sleeptime + " -- " + Thread.currentThread().getId(); }); } for (int i = 0; i < 20; ++i) { Future<String> future = execcomp.take(); System.out.println(future.get()); } exec.shutdown(); } }

 

 






技术帖      龙潭书斋      线程      thread      并发      java      concurrent      java并发编程实战      executor      completionservice      future      futuretask     


京ICP备15018585号