线程的取消与关闭

2016-08-24 19:25:34   最后更新: 2016-08-25 17:50:45   访问数量:609




让一个任务或线程启动很容易,但有时我们需要中止线程的执行,但这并不容易,因为 java 并没有提供任何机制来安全地终止线程

本篇日志中,我们就来介绍一下 java 线程退出的几种方式

 

由于线程间共享的特性,可以通过使用一个共享的 volatile 成员实现取消状态的保存,每隔一段时间,线程检查该成员的值判断是否应该退出

这是一种常见的协作式机制,让取消任务的线程遵循一种协商好的协议

package com.techlog.test.service; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * Created by techlog on 16/8/16. */ @Service public class TestExecutorService { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TestExecutorService.class); private static final ExecutorService exec = Executors.newFixedThreadPool(2); private static volatile boolean canceled = false; public String beepForOneMinute() throws ExecutionException, InterruptedException { List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < 2; ++i) { final int finalI = i; futures.add(exec.submit(() -> { if (finalI == 0) { int num = 0; while (!canceled) { Thread.sleep(1000); num++; } return "Spend " + num + "second"; } else { Thread.sleep(3000); canceled = true; return ""; } })); } String ret = ""; for (Future<String> future : futures) { ret += future.get(); } exec.shutdown(); return ret; } }

 

 

上面的例子中,线程池开辟了两个线程,0号线程循环等待并递增局部计数器,记录循环次数,直到共享的 volatile 成员 canceled 变为 true,1号线程则在沉睡3秒后,将 canceled 成员置为 true,最终返回了:Spend 3second

 

Thread 类提供了 interrupt 方法,他虽然并不会让指定的线程立即中断执行,但是会设置相应的标识,在线程中可以通过 isInterrupted 方法获得当前是否被中断

事实上这样的方式与上面的例子是非常像的,当然,线程中也可以通过调用 static 的 interrupted 方法清除当前的中断状态,这个方法会将中断状态值返回

Thread.sleep、Thread.join、Object.wait 方法事实上都是基于等待 isInterrupted 返回 true 实现的,他们首先检查 isInterrupt 的返回,然后调用 interrupted 方法清除中断状态并抛出 InterruptedException 异常

从上面的描述中我们就可以看出,interrupt 方法并不能立即中断线程的执行,而是仅仅通知线程中断的请求

 

下面的代码实现了一个线程在三秒后中断另一个线程:

package com.techlog.test.service; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * Created by techlog on 16/8/16. */ @Service public class TestExecutorService { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TestExecutorService.class); private static final ExecutorService exec = Executors.newFixedThreadPool(20); private volatile Thread thread; public String beepForOneMinute() throws ExecutionException, InterruptedException { List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < 2; ++i) { final int finalI = i; futures.add(exec.submit(() -> { if (finalI == 0) { int num = 0; thread = Thread.currentThread(); while (!thread.isInterrupted()) { System.out.println("hello"); num++; } return "Spend " + num + "times"; } else { Thread.sleep(3000); thread.interrupt(); return ""; } })); } String ret = ""; for (Future<String> future : futures) { ret += future.get(); } exec.shutdown(); return ret; } }

 

 

ExecutorService.submit 方法将返回一个 Future 对象来描述任务,出了实现任务的生命周期、处理异常,Future 提供了 cancel 方法来实现任务的取消

Future 的 cancel 方法提供了一个 boolean 类型的参数 MayInterruptIfRunning,表示任务是否支持接受中断,只有当该参数为 true 并且任务正在运行时,线程才会被中断

标准的 Executor 创建的任务都实现了一种终端策略使任务可以通过中断被取消,因此,在这种情况下,我们可以通过 Future 的 cancel 方法实现任务的取消,他与 interrupt 方法一样,也是通过设置终止状态标识实现的,因此你也同样需要你检查这个标识并进行相应的处理

下面的方法实现了一个多线程分别计数的程序,一旦有三个线程返回,即中止:

package com.techlog.test.service; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * Created by techlog on 16/8/16. */ @Service public class TestExecutorService { public String beepForOneMinute() throws ExecutionException, InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(10); List<Integer> threadRet = new ArrayList<>(); List<Future<Integer>> futures = new ArrayList<>(); for (int i = 0; i < 10; ++i) { futures.add(exec.submit(() -> { int num = (int) (Math.random() * 1000) * 10000; int index = 0; try { while (index < num) { if (Thread.currentThread().isInterrupted()) { System.out.println("interrupted"); break; } index++; } } catch (Exception e) { e.printStackTrace(); } return index; })); } String result = ""; boolean isContinue = true; int times = 0; while (isContinue) { for (int i = 0; i < 10; i++) { if (threadRet.contains(i)) { continue; } Future<Integer> future = futures.get(i); int ret; try { ret = future.get(0, TimeUnit.SECONDS); System.out.println("finish " + i + " " + times); } catch (TimeoutException e) { continue; } result += i + " : " + ret + "; "; threadRet.add(i); if (++times == 3) { isContinue = false; break; } } } for (int i=0; i < 10; ++i) { System.out.println("cancel " + i); futures.get(i).cancel(false); } exec.shutdown(); return result; } }

 

 

事实上,这样做和直接调用线程的 interrupt 方法并没有太大的区别

上面的问题是一个常见的线程同步问题,使用闭锁、栅栏、信号量都可以实现

 

ExecutorService 提供了三种关闭方法:

  1. shutdown -- 正常关闭,不再执行新的任务,待全部已启动任务执行完毕后关闭
  2. shutdownNow -- 强制关闭,无论是否有任务在执行都立即关闭
  3. awaitTermination -- 在等待超时后强制关闭

 

shutdown 方法在通常情况下是安全的,但有时线程执行时间过长,就应该在一定的时间后强制关闭,这时就需要 awaitTermination 方法

 






读书笔记      技术帖      龙潭书斋      线程      thread      java      java并发编程实战     


京ICP备15018585号