一、用组合来实现线性安全
1.设计线程安全的类
设计线程安全类的三个基本要素:
1. 找出构成对象状态的所有变量
2. 找出约束状态变量的不变性条件
3. 建立对象状态的并发访问管理策略
要分析对象的状态,首先从对象的域开始。如果对象所有的域都是基本类型的变量,那么这些域将构成对象的全部状态;如果对象的域中引用了其他对象,那么该对象的状态将包含被引用的对象的域。
2.实例封闭
当一个对象被封装到另一个对象中,能够访问到被封装对象的所有代码路径都是已知的。通过将封闭机制和合适的加锁策略结合,可以确保以线程安全的方式来使用非线程安全的对象。
public class PersonSet{
private final Set<Person> mySet = new HashSet<Person>();
public sychronized void addPersion(Person p) {
mySet.add(p)
}
public sychronized boolean containsPerson(Person p) {
return mySet.contains(p);
}
}
解析:
虽然HashSet 并非线程安全的,但是mySet是私有的不会逸出。唯一能访问mySet的代码是addPerson(),和containsPerson()。在执行上他们都要获的PersonSet 上的锁。PersonSet的状态完全又它的内置锁保护。所以PersonSet是一个线程安全的类。
java 平台的类库有很多实例封闭的例子。比如一些基本的容器并非线程安全的,如ArrayList,HashMap。类库提供的包装器方法,Collections.synchronizedList(list)、Collections.synchronizedMap(m)只要这些包装器对象拥有对被包装容器对象的唯一引用(即把容器对象封装在包装器中),非线程安全的类就可以在多线程中使用。
3.线程安全性的委托
class Counter {
private AtomicInteger count = new AtomicInteger(0);
private int inc(){
return count.incrementAndGet();
}
}
对于Counter来说,由于Counter只有一个域就是AtomicInteger,而AtomicInteger又是线程安全的,所以很容易知道Counter是线程安全的。Counter把它的线程安全性交给了AtomicInteger来决定,也就说委托给了AtomicInteger来保证。
但是,当委托的状态变量超过1个时,就要看情况而言了。要看委托的状态变量之间是否有某种联系。如果委托的状态域是彼此独立的,那么不会影响组合的类的线程安全性。
class ListenerManager {
private final List<KeyListener> keyListeners = new CopyOnWriteArrayList<KeyListener>();
private final List<MouseListener> mouseListeners = new CopyOnWriteArrayList<MouseListener>();
public void addKeyListener(KeyListener e){
keyListeners.add(e);
}
public void addMouseListener(MouseListener e){
mouseListeners.add(e);
}
public void removeKeyListener(KeyListener e){
keyListeners.remove(e);
}
public void removeMouseListener(MouseListener e){
mouseListeners.remove(e);
}
}
对于ListenerManager来说,它把它的线程安全性委托给了keyListeners 和mouseListeners,而这两种状态在类中不存在任何的耦合关系,因此他们组合而成的类也是线程安全的。(CopyOnWriteArrayList是一个线程安全的链表)
当委托的多个状态存在耦合关系时,委托可能会失效!
class NumberRange {
private final AtomicInteger lower = new AtomicInteger(0);
private final AtomicInteger upper = new AtomicInteger(0);
public void setLower(int i){
//不安全的 先检查后执行
if(i > upper.get()){
throw new RuntimeException("最小值不能比最大值大");
}
lower.set(i);
}
public void setUpper(int i){
//不安全的 先检查后执行
if(i < lower.get()){
throw new RuntimeException("最大值不能比最小值小");
}
upper.set(i);
}
}
虽然lower和upper都是原子操作,但是由于在整个类中存在一个不变形条件——lower <= upper,setLower和setUpper都是“先检查后操作”的不安全操作,没有采取足够的加锁机制来保证这两个方法是一个原子操作(前面提到了,非原子性操作在多线程环境下是线程不安全的)。假设一个线程调用setLower(5),另一个线程调用setUpper(4),最终由于线程调度的顺序,有可能结果为(5,4)。
如果一个类,仅仅靠委托状态不足以维持线程安全性,这种情况下,这个类必须提供自己的加锁机制保证这些复合操作是原子操作!
4.客户端加锁机制实现线程安全
class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
....
public synchronized boolean putIfAbsent(E x){
boolean absent = !list.contains(x);
if(absent){
list.add(x);
}
return absent;
}
}
这种通过扩展类,来实现我们想要的功能——如果没有,则添加。我们很自然想到,在多线程环境下,putIfAbsent不是一个原子操作,因此我们会通过加锁来实现线程同步,那么真的实现了线程安全吗?
但实际上,这里用了两个不同的锁。
list是一个线程安全的链表,list中使用的锁的对象是List本身,而putIfAbsent中加的锁的对象是ListHelper,使用了不同的锁,意味着ListHelper相对于list来说,并不是原子操作,也就有可能一个线程调用putIfAbsent操作时,另一个线程调用其他list的其它方法。
下面是正确的加锁方式:
class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
public boolean putIfAbsent(E x){
synchronized(list){
boolean absent = !list.contains(x);
if(absent){
list.add(x);
}
return absent;
}
}
}
但是这种扩展类的方式仍然不值得推荐,因为会破坏同步策略的封装性。
二、Java类库的基础并发构建模块
1. 同步容器类
同步容器类包括Vector和HashTable,以及使用Collections.synchronizedXxx(例如Collections.synchronizedList(new ArrayList< T>()))等工厂方法创建的同步类。
这些类实现线程安全的方式是:把他们的状态封装起来,并对每一个共有方法都进行同步,使得每次只有一个线程能访问容器的状态。(从这个描述,可以看出,在高并发情况下可能效率是一个问题)
同步容器类存在的问题:
虽然同步容器类是线程安全的,对于Vector和HashTable,在类中提供的操作都是原子操作的,在多线程环境下就可以放心使用Vector和HashTable的方法。但是在一些复合操作上还是要加锁来实现同步,例如:迭代,条件运算(若不存在则添加)。
例如:
public static Object getLast(Vector list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
对于这种非原子操作,必须加锁达到线程同步
public static Object getLast(Vector list){
synchronized(list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
2.并发容器类
同步容器类将所有对容器状态访问都加了锁,以实现线程安全,代价就是严重降低了并发性,当多个线程竞争容器的锁时,吞吐量严重降低,而为了改善同步容器的性能,Java针对多个线程并发访问,提供了并发容器类。
例如:
ConcurrentMap,用来替代同步且基于散列的Map;
CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的List。
Queue和BlockingQueue等。
ConcurrentHashMap
CooncurrentMap使用了一个更加细化的锁机制, 名叫分离锁. 这个机制允许更深层次的共享访问. 任意数量的读取线程可以并发的访问Map, 读者和写者也可以并发的访问, 并且有限数量的写线程还可以并发修改Map, 结果是为并发带来更高的吞吐量, 同时几乎没有损失单线程访问的性能.
ConcurrentMap接口加入了对常见复合操作的支持, 比如”缺少即加入(putIfAbsent)”, 替换和条件删除, 而且这些操作都是原子操作
public interface ConcurrentMap<K, V> extends Map<K, V> {
V putIfAbsent(K key, V value);
boolean remove(Object key, Object value);
boolean replace(K key, V oldValue, V newValue);
V replace(K key, V value);
}
由于ConcurrentHashMap不能被加锁来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作。(为什么,如何体现?)
CopyOnWriteArrayList
CopyOnWriteArrayList用于替代同步List,并且在迭代期间不需要对容器进行加锁或复制。多个线程可以对该容器进行迭代, 并且不会受到另一个或者多个想要修改容器的线程带来的干涉, 迭代的时候返回的元素严格与创建的时候一致, 不会考虑后续的修改.。
在每次CopyOnWriteArrayList改变时都需要对底层数组进行一次复制, 因此当容器比较大时, 不是很合适, 只有当容器迭代操作的频率远远高于对容器修改的频率, 写入即复制容器是一个合适的选择。
Queue
BlockingQueue提供了可阻塞的put和take方法, 他们与可定时的offer和poll是等价的. 如果Queue已经满了, put方法会被阻塞直到有空间可用; 如果queue是空的, 那么take方法会被阻塞, 直到有元素可用. queue的长度可以有限, 也可以无限.
可以使用BlockingQueue的offer方法来处理这样一种场景: 如果条目不能被加入到队列里, 它会返回一个失败状态. 这样可以创建更多灵活的策略来处理超负荷工作, 比如减轻负载, 序列化剩余工作条目并写入硬盘, 减少生产者线程, 或者其他方法儿子生产者线程.
在类库中包含了BlockingQueue的多种实现,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList相似,但比同步的List拥有更好的并发性能。PriorityBlockingQueue是一个按优先级排序的队列,而不是FIFO。
SynchronousQueue
SynchronousQueue是一种BlockingQueue的实现,维护了一个没有存储空间的queue, 如果用洗盘子来比喻的话, 可以认为没有盘子架, 直接将洗好的盘子放到烘干机中. 因为是直接移交, 这样可以减少数据在生产者和消费者移动的延迟.
因为SynchronousQueue没有存储能力, 所以除非另一个线程已经准备好参与移交工作, 否则put和take会一直阻止, 这类队列只有在消费者充足的时候比较合适, 他们总是为下一个任务做好准备.
Deque
Deque(BlockingDeque)是一个双端队列是对Queue和BlockingQueue的扩展, 允许高效的在头和尾分别进行插入和删除, 其实现有ArrayDeque和LinkedBlockingDeque。
双端队列采用的是一种窃取的工作模式, 其原理是每一个消费者都有一个自己的双端队列, 如果一个消费者完成了自己的双端队列中的全部工作, 它可以偷取其他消费者的双端队列中末尾的任务. 由于消费者不会共享同一个队列, 因此相对于传统的生产者-消费者模式具有更高的可伸缩性. 而且即使一个工作者要访问另一个队列, 也是从末尾截取, 这样可以进一步降低对队列的争夺。
class Producer implements Runnable {
private String name;
private BlockingDeque<Integer> deque;
public Producer(String name, BlockingDeque<Integer> deque) {
this.name = name;
this.deque = deque;
}
public synchronized void run() {
for (int i = 0; i < 10; i++) {
try {
deque.putFirst(i);
System.out.println(name + " puts " + i);
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private String name;
private BlockingDeque<Integer> deque;
public Consumer(String name, BlockingDeque<Integer> deque) {
this.name = name;
this.deque = deque;
}
public synchronized void run() {
for (int i = 0; i < 10; i++) {
try {
int j = deque.takeLast();
System.out.println(name + " takes " + j);
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class BlockingDequeTester {
public static void main(String[] args) {
BlockingDeque<Integer> deque = new LinkedBlockingDeque<Integer>(5);
Runnable producer = new Producer("Producer", deque);
Runnable consumer = new Consumer("Consumer", deque);
new Thread(producer).start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(consumer).start();
}
}
3.同步工具类
同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barried)以及闭锁(Latch),如果不能满足自己的需求,还能自己定制同步工具类。
闭锁(Latch)
闭锁可以延迟线程的进度直到其到达终止状态,一个闭锁工作起来就像一道大门: 直到闭锁达到终点状态之前, 门一直是关闭的, 没有线程通过, 在终点状态到来的时候, 这扇门会打开并允许所有的线程通过。一旦闭锁到达了终点状态, 它就不能再改变状态了, 所以它会永远保打开状态。
CountDownLatch是一种灵活的闭锁实现,它可以使一个或者多个线程等待一组事件发生。它的状态包括一个计数器, 初始化为一个正数, 用来表现需要等待的事件数。countDown方法对计数器做减操作, 表示一个事件已经发生了, 而await方法等待计数器达到零, 这表示所有需要等待的时间都已经发生。如果计数器入口时值为非零, await会一直阻塞知道计数器为零, 或者等待线程中断以及超时。
public class TestHarness {
public long timeTasks(int n, final Runnable task) throws Exception {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(n);
for (int i = 0; i < n; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await(); // 所有线程运行到此被暂停, 等待一起被执行
try {
task.run();
} finally {
endGate.countDown();
}
} catch (Exception e) {
}
};
};
t.start();
}
long start = System.nanoTime();
startGate.countDown(); // 启动所有被暂停的线程
endGate.await(); // 等待所有线程执行完
long end = System.nanoTime();
return end - start;
}
public static void main(String[] args) {
TestHarness th = new TestHarness();
Runnable r = new Runnable() {
public void run() {
System.out.println("running");
}
};
try {
th.timeTasks(10, r);
} catch (Exception e) {
e.printStackTrace();
}
}
startGate是一个开始门,endGate是结束门。startGate初始为1,而endGate初始为工作线程的数量。
FutureTask
FutureTask的计算是通过Callable实现的, 它等价于一个可以携带结果的Runnable, 并且有三个状态:
等待运行, 正在运行和运行完成。
运行完成有三种情况:
正常结束, 取消结束和异常结束
一旦FutureTask进入完成状态, 它会永远停止这个状态上。
FutureTask.get()的行为依赖于任务的状态, 如果它已经完成, get可以立即返回结果, 否则会被阻塞,直到任务转入完成状态, 然后会返回结果或者抛出异常.
class PreLoader <V> {
private final FutureTask<V> future = new FutureTask<V>(new Callable<V>() {
@Override
public V call() throws Exception {
//
return loadTask();
}
});
private Thread thread = new Thread(future);
public void start(){
thread.start();
}
public V get() throws InterruptedException, ExecutionException{
return future.get();
}
private V loadTask() {
//模拟加载任务
return null;
}
}
信号量(Semaphore)
计数信号量用来控制能够同时访问某特定资源的活动的数量或者同时执行某一给定操作的数量
技术信号量可以用来实现资源池或者给一个容器设定边界。
一个Semaphore管理一个有效的许可集,许可的初始量通过构造函数来指定。活动能够获得许可, 并在使用之后释放许可, 如果已经没有可用的许可了, 那么acquire会被阻塞,直到有可用的为止(或者直到被中断或者操作超时),release方法向信号量返回一个许可。
一个初始值为1的Semaphore可以用来充当mutex(互斥锁)。
public class BoundedHashSet <T>{
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int n) {
set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(n);
}
public boolean add(T element) {
try {
sem.acquire(); //请求信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = false;
try {
result = set.add(element);
}finally {
sem.release();
}
return result;
}
public void remove(T o) {
boolean result = set.remove(o);
if (result) {
sem.release(); //返回信号量
}
}
public static void main(String[] args) {
final BoundedHashSet<String> bhs = new BoundedHashSet<String>(3);
for (int i = 0; i < 4; i++) {
Thread t = new Thread() {
@Override
public void run() {
bhs.add(System.currentTimeMillis() + "");
};
};
t.start();
}
}
}
栅栏(Barrier)
关卡类似于闭锁, 他们能够阻塞一组线程, 直到某些事件发生, 其中关卡与闭锁的关键不同在于, 所有线程必须同时达到关卡点, 才能继续处理. 闭锁等待的是事件, 关卡等待其他线程. 关卡实现的是协议, 就像一些家庭成员指定商场中的集合地点:”我们每一个人6:00在麦当劳见, 到了以后不见不散, 之后我们再决定接下来做什么。”
CyclicBarrier允许一个给定数量的成员多次集中在一个栅栏位置,这在并行迭代算法中非常有用, 这个算法会把一个问题拆分成一系列相互独立的子问题, 当线程到达栅栏位置时, 调用await, await将会阻塞所有线程到达栅栏位置,直到所有线程到达关卡点。
关卡通常用来模拟这种情况, 一个步骤的计算可以并行完成, 但是要求必须完成所有与一个步骤相关的工作后才能进入下一步。
public class Cellular {
private CyclicBarrier cb;
private Worker[] workers;
public Cellular() {
int count = Runtime.getRuntime().availableProcessors();
workers = new Worker[count];
for (int i = 0; i < count; i++) {
workers[i] = new Worker();
}
cb = new CyclicBarrier(count, new Runnable() {
public void run() {
System.out.println("the workers is all end...");
}
});
}
public void start() {
for (Worker worker : workers) {
new Thread(worker).start();
}
}
private class Worker implements Runnable {
public void run() {
System.out.println("working...");
try {
cb.await();//在这里线程阻塞,等待其他线程。
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Cellular c = new Cellular();
c.start();
}
}
Exchanger是关卡的另外一种形式, 它是一种两步关卡, 在关卡交汇点会叫唤数据, 当两方进行的活动不对称时, Exchanger是非常有用的, 比如当一个线程向缓冲写入一个数据, 这是另一个线程充当消费者使用这个数据。
三、线程安全总结
- 可变状态越少,越容易保证线程安全性
- 尽量将域声明为final,除非需要它们是可变的
- 不可变对象一定是线程安全的
- 封装有利于管理复杂性
- 用锁来保护每一个可变变量
- 当保护同一个不变性条件的所有变量时,要使用同一个锁(最容易忽略)
- 在执行复合操作时,要持有锁