5.1 同步的Collections
JDK中,同步的Collections包含Vector和Hashtable,以及从1.2之后加入的Collections.synchronizedXXX 工厂构造函数生成的类。
这些类都内置了同步措施,确保任何时间只有一个线程能访问public方法。
然而这有一些例外:下标迭代、iteration迭代、foreach、next()、pre()等都是“快速失效的”,即有可能在你调用的过程中其他线程执行了修改,导致抛出异常。
因此,下面只是理论上的线程安全:
//可能会抛出异常 for (int i = 0; i < vector.size(); i++) doSomething(vector.get(i));
这可以通过客户端加锁来修正:
synchronized (vector) { for (int i = 0; i < vector.size(); i++) doSomething(vector.get(i)); }
但是加锁实际上性能开销很大,而且增大了死锁的概率。因此可以有另一种方法:在迭代前clone出一个副本出来,然后对“静态的”副本进行迭代操作等。
另外需要注意“隐式的”迭代器,例如对一个Set/Map来toSting也会隐式调用迭代器。
5.2 同步容器类
从Java 5开始,提供了一系列的线程安全的容器类。之前的版本都是通过“串行化”来实现线程安全的,并发性能很差。而Java 5之后提供的类具有较好的并发性能、并保证了线程安全
ConruccentMap是线程安全的Map接口。具有ConcurrentHashMap和ConrurrentSkipMap两个子类。前者用于替代HashMap,后者替代SortedMap。
CopyOnWriteArrayList是线程安全的List。当需要经常遍历的时候,使用较合适。
Queue接口,ConcurrentLinkedQueue:FIFO。
5.2.1 ConcurrentHashMap
ConcurrentHashMap与普通的同步Map不同,使用了不同的同步策略,具有更好的并发性。
更优秀的一点是,它们提供的Iterator不再是快速失效的了,即不会抛出ConcurrentModificationException。也就不必再遍历的时候加锁控制。
缺点:
(1)size()不再是准确的,而只是大概的估算值(因为不能立即反应变化)。
(2)不能被互斥锁锁定。
尽管如此,与Hashtable 或者 synchronizedMap相比ConcurrentHashMap还是非常有吸引力的,应该多使用。
因为不支持互斥锁定,因此就无法在Client的代码中实现组合原子操作,例如put-if-absent,removal-if-equal等。这些都由ConcurrentHashMap提供好了。
如果还需要自己写类似的组合原子操作,换用ConrurrentMap最好。
5.2.3 CopyOnWriteArrayList
CopyOnWriteArrayList/CopyOnWriteArraySet是用于替代同步的List/Set的。
迭代器同样不会抛出修改异常。当每次迭代前会拷贝List和Set的副本,因此这个副本是“Immute不变”的,就无需同步。
显然,拷贝的代价是巨大的,所以仅当元素不多且修改非常少、迭代非常多的情况下,才考虑使用它们。
5.3 阻塞队列与生产者-消费者模型
JDK6以后,提供了更为强大的接口:BlockingQueue和BlockingDequeue。
上述接口内置实现了“N对N的生产者-消费者”模型。当Queue为空的时候,take会被阻塞。同理Queue可设置“容量上限”,超过容量上线后,put操作会被阻塞。
它们的内部具有高并发的同步机制,因此使用下面的代码完全是线程安全的。
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
无需额外的同步控制了,而且线程可以N个。
实现了这些接口的有ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue。分别对应于之前几乎所有的容器类。
BlockingDeQueue使用了更先进的“Working-Stell”模型,当一个线程没有工作可以做的时候,会从其他线程的工作队列“偷取”任务,以保证每个线程都是忙碌的。
这在Web-Crawler中经常被用到。
5.4 阻塞和可中断(Interruptible)的方法
当线程在活动之前或活动期间处于正在wait()等待、sleep()休眠或占用状态且该线程被中断时,抛出该异常。有时候,一种方法可能希望测试当前线程是否已被中断,如果已被中断,则立即抛出InterruptedException异常。
被中断 != 线程终止!只代表线程状态的改变。
可中断的方法(Interruptible)是指方法将线程被中断的时候抛出InterruptedException异常。例如BlockingQueue提供的take和put都是这种可中断的方法。
当你的程序调用这种“可中断”方法的时候,需要捕获这种异常时,有两种处理方法:
1、继续传播。2、自我捕获。
下面是一个例子:
public class TaskRunnable implements Runnable { BlockingQueue<Task> queue; ... public void run() { try { processTask(queue.take()); } catch (InterruptedException e) { // restore interrupted status Thread.currentThread().interrupt(); } } }
5.5 同步装置
同步装置:协调控制并发流程的各种机制。例如前面的BlockingQueue、信号量、栅栏、闸门等。
5.5.1 门阀
延迟线程直到到达某个中介状态。闸门打开后状态将维持不变。
用途:
1、保证资源初始化前不会被使用。
2、确保服务的依赖顺序,当之前服务启动之前,不会启动后面的服务。
3、等待多方因素都参与到活动中。例如多人参加的在线游戏。
CountDownLatch(JDK5后提供)是一种经典的门阀。它允许多个线程等待一系列的事件发生。门阀状态由正整数表示,即等待的事件个数。countDown()方法 对计数器减一,表示某个事件发生了。调用await()方法将阻塞,直到countDown减到0(所有的事件都发生)。
用这个方法可以精确计算并发程序的效率。即:
1、先创建n个线程,然后让他们等待第一个阀门。
2、创建完毕后,开启第一个阀门。第二个阀门设置为N(线程个数)
3、每个线程执行完毕后阀门二countDown()。
最后主线程就得到了比较精确的时间了。
JDK里面的例子比较直观,书上给的有些小别扭。
public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException ignored) { } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end-start; } }
5.5.2 FutureTask
Future同时实现了Callable接口和Runnable,
Runnable使其能像个线程一样工作,儿Callable支持返回结果。
因此经常用在Executor中,异步的执行任务并返回结果,或者预计算(计算耗时但无需同步地等待结果)。
它提供了get()用于获取执行返回值。如果线程没有执行完毕,get将被阻塞直到执行完毕返回。如果执行完毕了,马上就返回。
public class Preloader { private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>() { public ProductInfo call() throws DataLoadException { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } public ProductInfo get() throws DataLoadException, InterruptedException { try { return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) throw (DataLoadException) cause; else throw launderThrowable(cause); } }
5.5.3 信号量
Semaphore类在JDK5后提供。
acquire()获得一个信号量(没有则阻塞),release()释放一个信号量。
public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore sem; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<T>()); sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) sem.release(); } } public boolean remove(Object o) { boolean wasRemoved = set.remove(o); if (wasRemoved) sem.release(); return wasRemoved; } }
用处很多,就不说了。
5.5.4 屏障
屏障与闸门类似,闸门时一次性的,而栅栏可以被reset重新使用。
CyclicBarrier是一个同步辅助类,允许一组线程相互等待,直到到达某个公公屏障点,它再释放后可以重新reset重用。
class Solver { final int N; final float[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { while (!done()) { processRow(myRow); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } public Solver(float[][] matrix) { data = matrix; N = matrix.length; barrier = new CyclicBarrier(N, new Runnable() { public void run() { mergeRows(...); } }); for (int i = 0; i < N; ++i) new Thread(new Worker(i)).start(); waitUntilDone(); } }
屏障经常用在仿真中,某个步骤可以被并行分解为多个步骤,但是必须都完成了才能走下一步的时候。
另外一种形式是Exchanger,可以拿来直接实现双缓冲、生产者消费者之类的。