7.1 任务的取消
任务应该是可取消的:在run的外界可以让其状态从运行变为终止。
需要取消任务的场景:
1、用户取消任务,如点击了“取消”
2、时间timeout限制的任务
3、程序外部事件需要处理。
4、出错
5、关闭、退出、清理
Java中没有原生提供“停止线程”的方法,但可以使用“bool标志位+volatile”的方式来实现,注意list还是需要被锁保护的。
@ThreadSafe public class PrimeGenerator implements Runnable { @GuardedBy("this") private final List<BigInteger> primes = new ArrayList<BigInteger>(); private volatile boolean cancelled; public void run() { BigInteger p = BigInteger.ONE; while (!cancelled ) { p = p.nextProbablePrime(); synchronized (this) { primes.add(p); } } } public void cancel() { cancelled = true; } public synchronized List<BigInteger> get() { return new ArrayList<BigInteger>(primes); } } List<BigInteger> aSecondOfPrimes() throws InterruptedException { PrimeGenerator generator = new PrimeGenerator(); new Thread(generator).start(); try { SECONDS.sleep(1); } finally { generator.cancel(); } return generator.get(); }
7.1.1 Interruption
有的时候这种简单机制足矣,但是如果使用了BlockingQueue呢?get()将被阻塞,根本没机会去check这个bool的状态。这就是Interruption机制的用武之地了。
Thread内置bool变量:
1、调用线程对象的interruption(),改变bool状态,线程变为被中断
2、Thread.sleep、Object.wait以及可能抛出InterruptionException的方法会检测到,并尽快抛出异常、返回。同时清空bool状态。
3、static的interrupted将清理当前线程的bool的值并恢复之前的状态
interruption(和中断一样),并发马上终止,而是在线程的“下一个最快可能时间”结束并转入其他流程。
interrupt有两个捕获点:
1、正在Sleep、wait时抛出的InterruptionException异常
2、正好处于while循环check时,检查isInterruption()
class PrimeProducer extends Thread { private final BlockingQueue<BigInteger> queue; PrimeProducer(BlockingQueue<BigInteger> queue) { this.queue = queue; } public void run() { try { BigInteger p = BigInteger.ONE; while (!Thread.currentThread().isInterrupted()) queue.put(p = p.nextProbablePrime()); } catch (InterruptedException consumed) { /* Allow thread to exit */ } } public void cancel() { interrupt(); } }
7.1.2 Interruption策略
由于线程没有直接的cancel机制,所以一种比较“官方”的方法是通过“interrupt”。
线程的调用者可以对线程进行interrupt(),线程可以定期对isInterrupt()进行判断,并进行相应的处理。
线程自己也可以Thread.currentThread().interrupt();
7.1.3 响应Interruptiion
在抛出InterruptedException异常时,有两种策略:
1、传递异常(throws直接)
2、重设interrupt状态(因为你不想抛出异常导致程序退出)
策略1,直接加上throws:
public Task getNextTask() throws InterruptedException {
return queue.take();
}
策略2,捕获异常后,再次Thread.currentThread().interrupt():
public Task getNextTask(BlockingQueue<Taskgt; queue) { boolean interrupted = false; try { while (true) { try { return queue.take(); } catch (InterruptedException e) { interrupted = true; // fall through and retry } } } finally { if (interrupted) Thread.currentThread().interrupt(); } }
7.1.4 例子
“ 在指定时间内运行某线程,超时则结束掉”是一个常见的业务需求。
除了上述各种蹩脚的办法外,还可以用Future解决这个问题。
public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = taskExec.submit(r); try { task.get(timeout, unit); } catch (TimeoutException e) { // task will be cancelled below } catch (ExecutionException e) { // exception thrown in task; rethrow throw launderThrowable(e.getCause()); } finally { // Harmless if task already completed task.cancel(true); // interrupt if running } }
尽管get返回的是null,但是这种方法可以清晰的了解到到底是超时,还是运行时错误导致的异常。
7.1.6 处理无法被interrupt的代码
不是所有的方法都能“响应”interrupt,特别是I/O阻塞这些。有一些特殊的技巧来取消被阻塞的任务:
java.io的同步阻塞:直接关闭socket、文件。(可能会抛出SocketException)
java.nio的同步阻塞:打断InterruptibleChannel上等待的线程,会抛出异常CloseByInterruptException。关闭Channel会抛出AsynchronousCloseException。多数Channel都实现了InterruptibleChannel。
java.nio的异步非阻塞I/O:如果被阻塞在Selector.select上,可以调用wakeup让它直接返回。
public class ReaderThread extends Thread { private final Socket socket; private final InputStream in; public ReaderThread(Socket socket) throws IOException { this.socket = socket; this.in = socket.getInputStream(); } public void interrupt() { try { socket.close(); } catch (IOException ignored) { } finally { super.interrupt(); } } public void run() { try { byte[] buf = new byte[BUFSZ]; while (true) { int count = in.read(buf); if (count < 0) break; else if (count > 0) processBuffer(buf, count); } } catch (IOException e) { /* Allow thread to exit */ } } }
尽管Runnable不支持cancel,但是将它转化成Future以支持cancel。
可以利用ThreadPoolExecutor的newTaskfor将Callable包装成RunnableFuture,以支持cancel。newTaskFor负责将向pool中提交的Task转化成Future(RunnableFuture)。
而定义cancel,可以在定义Task的时候用匿名函数搞定。
public interface CancellableTask<T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); } @ThreadSafe public class CancellingExecutor extends ThreadPoolExecutor { ... protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if (callable instanceof CancellableTask) return ((CancellableTask<T>) callable).newTask(); else return super.newTaskFor(callable); } } public abstract class SocketUsingTask<T> implements CancellableTask<T> { @GuardedBy("this") private Socket socket; protected synchronized void setSocket(Socket s) { socket = s; } public synchronized void cancel() { try { if (socket != null) socket.close(); } catch (IOException ignored) { } } public RunnableFuture<T> newTask() { return new FutureTask<T>(this) { public boolean cancel(boolean mayInterruptIfRunning) { try { SocketUsingTask.this.cancel(); } finally { return super.cancel(mayInterruptIfRunning); } } }; } }