5.1 同步容器类
5.1.1同步容器类的问题
在同步容器类中,复合操作在没有客户端加锁的的情况下仍然是线程安全的,
但当其他线程并发地修改容器时,可能有问题
程序清单5-1 Vector上可能导致混乱结果的复合操作
/**
* 以下代码看似没问题:无论多少个线程同时调用,都不会破坏Vector
* 但是从调用者的角度来讲情况就不同了。
* 如果A线程在包含10个元素的Vector上调用getList
* 同时B线程对同一个Vector调用removeLast,这些操作交替执行,
* 则可能出现ArryIndexOutOfBoundsException异常
*/
public class Test01 {
public static Object getLast(Vector<Object> list) {
int lastIndex = list.size() -1;
return list.get(lastIndex);
}
public static Object removeLast(Vector<Object> list) {
int lastIndex = list.size() -1;
return list.remove(lastIndex);
}
}
程序清单5-2在使用客户端加锁的Vector上的复合操作
import java.util.Vector;
public class Test2 {
public static Object getLast(Vector<Object> list) {//原子操作
synchronized (list) {//保证Vector的大小在get和remove之间不会发生
int lastIndex = list.size() -1;
return list.get(lastIndex);
}
}
public static Object removeLast(Vector<Object> list) { //原子操作
synchronized (list) {//保证Vector的大小在get和remove之间不会发生
int lastIndex = list.size() -1;
return list.remove(lastIndex);
}
}
}
程序清单5-3 可能抛出ArrayIndexOutOfBoundsException的迭代操作
//此处在循环,其他线程删除元素,会产生问题
for(int i = 0; i < vector.size(); i++) {
doSomething(vector.get(i));
}
程序清单5-4 带有客户端加锁的迭代操作(解决不可靠迭代)
/**
在迭代期间持有Vector的锁,可以防止其他线程在迭代期间修改Vector,
然而这同样会导致其他线程在迭代期间无法访问它,因此降低了并发性
之所以要在客户端加锁(虽然Vector本身有内置锁,线程安全的)
是因为此处有复核操作:
【vector.size() + vector.get(i)】且doSomthing()里面可能还对vector有修改
*/
synchronized(vector) {
for(int i = 0; i < vector.size(); i++)
doSomething(vector.get(i));
}
迭代的标准方式是Iterator,然而有其他的线程并发地修改容器,
即使是使用Iterator也无法避免在迭代期间对容器加锁。 在涉及同步容器类的迭代器时并没有考虑到并发修改的问题。并且他们的表现出的行为是“及时失败”(fail-fast)的。这意味着当他们发现容器在迭代过程中被修改时,就会抛出一个ConcurrentModificationException异常。这种“及时失效”并不是一种完备的处理机制,只是“善意地”捕获并发错误,因此只能作为并发问题的预警指示器。
`程序清单5-5 通过Iterator来迭代List
List<Widget> widgetList = Collections.synchronizedList(new ArrayList<Widget>());
...
/*javac将生产Iterator代码来循环,反复调用hasNext和next来迭代List对象,可能抛出ConcurrentModificationException
*/
for(Widget w : widgetList)
doSomething(w);
很多时候不希望加锁。例如:某些线程在可以访问容器之前,必须等待迭代过程结束,如果容器规模很大,或者每个元素上执行操作的时间很长,那么这些线程将长时间等待。容器像5-4那样加锁,可能会产生死锁。
如果不希望在迭代期间对容器加锁,那么一种替代方案是“克隆”容器,副本被封闭在线程内,并在副本上进行迭代。(容器大时有性能问题)
5.1.3 隐藏迭代
虽然加锁可以防止迭代器抛出ConcurrentModificationException,但必须在所有对共享容器进行迭代的地方都需要加锁。
有些情况下,迭代器会隐藏起来。
public abstract class AbstractCollection<E> implements Collection<E> {
public String toString() {
Iterator<E> it = iterator();
if (! it.hasNext())
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) { //死循环
E e = it.next();
sb.append(e == this ? "(this Collection)" : e);
if (! it.hasNext())
return sb.append(']').toString();
sb.append(',').append(' ');
}
}
}
程序清单 5-6 隐藏在字符串连接中的迭代操作(不要这么做)
@NotThreadSafe
public class HiddenIterator {
@GuardedBy("this")
private final Set<Integer> set = new HashSet<>();
public synchronized void add(Integer i) { set.add(i); }
public synchronized void remove(Integer i) { set.remove(i);}
/**
* 可能会抛出ConcurrentModificationException,容器的toString()方法对容器进行了迭代
* 真正的原因在于HiddenIterator不是线程安全的。在使用println中的set对象之前必须首先获取HiddenIterator的锁,
* 但在调试代码的过程中通常会忽略这个问题
*/
public void addTenThings() {
Random r = new Random();
for(int i = 0; i < 10; i++)
add(r.nextInt());
System.out.println("Debug: add ten elements to + " + set);
}
}
5.2并发容器
5.2.1ConcurrentMap
程序清单5-7 ConcurrentMap 接口
public interface ConcurrentMap<K,V> extends Map<K,V> {
//仅当K没有响应的映射值时才插入
V putIfAbsent(K key, V value);
//仅当K被映射到V时才移除
boolean remove(Object key, Object value);
//仅当K被映射到oldValue时才替换为newValue
boolean replace(K key, V oldValue, V newValue);
//仅当K被映射到某个值时才替换为newValue
V replace(K key, V value);
}
5.3 阻塞队列和 生产者——消费者 模式
程序清单 5-8 桌面搜索应用程序中的生产者任务和消费者任务
/**
* 生产者消费者提供了一直适合线程的方法将桌面搜索问题分解为更简单的组件。将文件的遍历与监理索引等功能分解为独立的操作,
* 比将所有功能都放到一个操作中实现由着更高的代码可读性和可重用性:每个操作值完成一个任务,
* 并且阻塞队列将负责所有的控制流,因此每个功能的代码都更加简单和清晰,并且可以并发执行,效率大于并行执行。
* @param root
* @throws InterruptedException
*/
package com.everjiankang.miaosha;
import java.io.File;
import java.io.FileFilter;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
/**
* 文件爬虫程序(生产者)
*/
public class FileCrawer implements Runnable {
private final BlockingQueue<File> fileQueue;
private final FileFilter fileFilter;
private final File root;
private final Set<String> alreadyExistFileSet = new ConcurrentSkipListSet<>();
public FileCrawer(BlockingQueue<File> fileQueue, FileFilter fileFilter, File root) {
super();
this.fileQueue = fileQueue;
this.fileFilter = fileFilter;
this.root = root;
}
@Override
public void run() {
try {
this.crawl(root);
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void crawl(File root) throws InterruptedException {
if(root != null) {
File[] entries = root.listFiles(fileFilter);
if(entries != null) {
for(File file : entries) {
if(file.isDirectory())
crawl(file);
else if(!alreadyIndexed(file)) {
System.out.println("生产者放入队列:" + file.getPath() + file.getName());
fileQueue.put(file);
alreadyExistFileSet.add(file.getPath()+file.getName());
}
}
}
}
}
private boolean alreadyIndexed(File file) {
return alreadyExistFileSet.contains(file.getPath()+file.getName());
// return fileQueue.contains(file);
}
}
消费者
package com.everjiankang.miaosha;
import java.io.File;
import java.util.concurrent.BlockingQueue;
/**
* 消费者,建立文件索引
* @author guchunchao
*
*/
public class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true) {
indexFile(queue.take());
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void indexFile(File file) {
System.out.println("消费:" + file.getPath()+file.getName());
}
}
程序清单5-9 启动桌面搜索(生产者-消费者调用)
package com.everjiankang.miaosha;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class XiaoFeiQueue {
private final static int BOUND = 10;
private final static int N_CONSUMMERS = 10;
public static void main(String[] args) {
File file = new File("/Users/guchunchao/Desktop/深入理解java虚拟机视频1-6");
File[] files = new File[10];
files[0] = file;
startIndexing(files);
}
public static void startIndexing(File[] files) {
BlockingQueue<File> queue = new LinkedBlockingDeque<File>(BOUND);
FileFilter filter = new FileFilter() {
@Override
public boolean accept(File pathname) {
return true;
}
};
//n个线程爬取文件
for(File file : files) {
new Thread(new FileCrawer(queue, filter, file)).start();
}
//10个线程消费
for(int i = 0; i < N_CONSUMMERS; i++) {
new Thread(new Indexer(queue)).start();
}
}
}
5.3.2 串行线程封闭
5.3.3 双端队列与工作密取
5.4 阻塞方法与中断方法
程序清单5-10 回复中断状态以避免屏蔽中断
public class TaskRunnable implements Runnable {
BlockingQueue queue;
.......
public void run() {
try{
processTask(queue.take());
}catch(Exception e) {
Thread.currentThread().intterrupt(); //恢复被中断的状态
}
}
}
5.5 同步工具类
5.5.1闭锁
程序清单5-11 在计时测试中使用CountDownLatch来启动和停止线程
/**
* 测试n个线程并发执行某个人物时需要的时间
* latch 英[lætʃ] n. 门闩; 弹簧锁;
*/
public class TestHarness {
//任务task就一份逻辑,用nThreads个线程并发执行
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
@Override
public void run() {
try {
startGate.await(); //在每一个工作线程的开始处放一道栏杆,在主线程中控制开启
try {
task.run();
}finally {
endGate.countDown(); //每个线程运行完一个逻辑,关闭门数量就减一
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown(); //启动门使得主线程同时释放所有工作线程
endGate.await(); //结束门使主线程等待最后一个线程执行完成,而不是顺序地等待每个线程执行完成
long end = System.nanoTime();
return end - start;
}
}
5.5.2FutureTask
程序清单5-2 使用FutureTask来提前加载稍后需要的数据
package com.multithread.unit05;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
//程序清单5-2 使用FutureTask来提前加载稍后需要的数据
public class Preloader {
private final FutureTask<ProjectInfo> future = new FutureTask<>(new Callable<ProjectInfo>() {
@Override
public ProjectInfo call() throws Exception {
return loadProjectInfo();
}
});
ProjectInfo loadProjectInfo(){
//TODO 从DB加载产品信息
ProjectInfo info = new ProjectInfo("aaa");
return info;
}
private final Thread thread = new Thread(future);
//在构造函数和静态初始化方法中启动线程不是一种好方法,
//因此提供start方法来启动线程
public void start() {
thread.start();
}
public ProjectInfo get() {
try {
return future.get();
} catch (ExecutionException | InterruptedException e) {
return null;
}
}
public static void main(String[] args) {
Preloader p = new Preloader();
p.start();
System.out.println(p.get());
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
class ProjectInfo{
private String name;
}
5.5.3信号量Semaphore
程序清单5-14 使用信号量为容器设置边界
/**
* 计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,
* 或者同时执行某个指定操作的数量。计数信号量还可以实现某种资源池,或者对容器施加边界
*
* Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。
* 在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。
* 如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或操作超时)
* release将返回一个许可给信号量
*/
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
/**
* 信号量的计数值会初始化为容器的最大值。
*/
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
this.sem = new Semaphore(bound);
}
/**
* add操作向底层容器中添加一个元素之前,首先获取一个许可。
* 如果add操作没有添加任何元素,那么会立刻释放许可。
*/
public boolean add(T o) throws InterruptedException {
sem.acquire();//获取(消费)一个许可
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if(!wasAdded)
sem.release();//release()方法将返回一个许可给信号量
}
}
/**
* remove操作释放一个许可,使更多的元素能添加到容器中来。
* 底层的Set实现并不知道关于边界的任何信息,这是由BoundedHashSet来处理的
* @param o
* @return
*/
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if(wasRemoved)
sem.release();//release()方法将返回一个许可给信号量
return wasRemoved;
}
}
程序清单5-15 通过CyclicBarrier协调细胞自动衍生系统中的计算(没看懂)
/**
* 通过栅栏来计算细胞的自动化模拟,在把模拟过程并行化时,
* 为每一个元素分配一个独立的线程是不现实的,因为将产生过多线程,
* 而在协调这些线程上导致的开销将降低计算性能。
*
* 合理的做法:
* 将问题分解成一定数量的子问题,为每个子问题分配一个线程来进行求解,之后再将所有的结果合并起来。
* CellularAutomata将问题分解成N(CPU)个字问题。
*/
public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;
public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count, new Runnable() {
@Override
public void run() {
mainBoard.commitNewValues();
}
});
this.workers = new Worker[count];
for (int i = 0; i < count; i++) {
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
}
//工人将细胞填满
private class Worker implements Runnable{
private final Board board;
public Worker(Board board) {this.board = board;}
@Override
public void run() {
while(!board.hasConverged())
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x,y));
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
return;
}
}
public int computeValue(int x, int y) {
return x + y;
}
public void start() {
for(int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard.waitForConvergence();
}
}
}
/**
* 细胞类,方法内容都是瞎写的
*/
class Board{
private int x,y,value;
public void setNewValue(int x, int y,int value) {
this.x = x;
this.y = y;
this.value = value;
}
//converge 英[kənˈvɜ:dʒ] 聚集
public boolean hasConverged() {
//TODO
return true;
}
public int getMaxX() { return 100; }
public int getMaxY() { return 100; }
//计算新的值
public void commitNewValues() {
//TODO
}
//衍生子细胞
public Board getSubBoard(int count, int i) {
Board newBoard = new Board();
newBoard.setNewValue(i, i, count);
return newBoard;
}
public void waitForConvergence() {
}
}
Exchanger
一个线程向缓冲区写数据,另一个线程从缓冲区读取数据,这些线程可以用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。当2个线程通过Exchanger交换对象时,这种交换就把2个对象安全地发布给另一方。
5.6 构建高效且可伸缩的结果缓存
程序清单5-16 使用HashMap和同步机制来初始化缓存,串行效率低
public interface Computable<A,V> {
V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunction implements Computable<String, BigInteger> {
@Override
public BigInteger compute(String arg) throws InterruptedException {
return new BigInteger(arg);
}
}
//本Computable的compute中主要是缓存逻辑,嵌套了通过构造函数引用的外部计算逻辑,将缓存逻辑和业务计算逻辑分离
@ThreadSafe
public class Memoizer1<A,V> implements Computable<A,V> {
@GuardedBy("this")
final Map<A,V> cache = new HashMap<>();
final Computable<A,V> c;
public Memoizerl(Computable<A, V> c) { this.c = c; }
@Override
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if(result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}
程序清单5-17 用ConcurrentHashMap替换HashMap(串行变并发)
/**
* Memoizer2比Memoizer1有着更好的并发行为:多线程可以并发地使用它。
* 但是作为缓存时仍然存在着一些不足——当2个线程同时调用compute时存在一个漏洞:
* 可能会导致计算得到相同的值。在使用缓存的情况下,这只会带来低效,因为缓存的作用是避免相同的数据被计算多次。
* 如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在运行,那么很肯能会重复这个运算。
*/
//本Computable的compute中主要是缓存逻辑,嵌套了通过构造函数引用的外部计算逻辑,将缓存逻辑和业务计算逻辑分离
@ThreadSafe
class Memoizer2<A,V> implements Computable<A,V> {
@GuardedBy("this")
final Map<A,V> cache = new ConcurrentHashMap<>();
final Computable<A,V> c;
public Memoizer2(Computable<A, V> c) { this.c = c; }
@Override
public V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if(result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}
程序清单5-18 基于Future的缓存封装器
/**
* 将计算过程放到Map缓存中,而map又是并发集合,真正的并发并不是方法中的FutureTask(虽然其实现了Runable接口)
* 而是外部的多线程并发调用Memoizer3的compute方法获取缓存的值,
* 若是compute中曾计算过某key且已经结束了,则直接出结果,
* 若是compute中曾计算过某key且还未结束,则阻塞等待先前的调用结束出结果,不再重复重头调用计算逻辑,
* 若是compute中未曾计算过某key,则在未来过程中执行计算逻辑,并等待结果
*/
//本Computable的compute中主要是缓存逻辑,嵌套了通过构造函数引用的外部计算逻辑,将缓存逻辑和业务逻辑计算分离
class Memoizer3<A,V> implements Computable<A,V> {
@GuardedBy("this")
final Map<A,Future<V>> cache = new ConcurrentHashMap<>();
final Computable<A,V> c;
public Memoizer3(Computable<A, V> c) { this.c = c; }
@Override
public V compute(A arg) throws InterruptedException {
Future<V> future = cache.get(arg);
if(future == null) {
Callable<V> callable = new Callable<V>() { //真正计算逻辑的调用
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> futureTask = new FutureTask<>(callable); //将计算逻辑封装到Future过程中
future = futureTask;
cache.put(arg, futureTask); //将计算逻辑的过程放到缓存中(无论计算是否执行完成,key已经在缓存中占了)
futureTask.run();//开启缓存的运算,这里将调用c.compute
}
V v = null;
try {
v = future.get(); //获取结果,若是Future过程已经计算完毕,直接出结果,还没计算完毕则阻塞等待
} catch (ExecutionException e) {
e.printStackTrace();
}
return v;
}
}
程序清单5-19 缓存的最终实现
class Memoizer4<A,V> implements Computable<A,V> {
@GuardedBy("this")
final Map<A,Future<V>> cache = new ConcurrentHashMap<>();
final Computable<A,V> c;
public Memoizer4(Computable<A, V> c) { this.c = c; }
@Override
public V compute(A arg) throws InterruptedException {
Future<V> future = cache.get(arg); //Future是接口,标准
/*
* if代码块仍然是非原子(nonatomic)的“先检查再执行”操作
* 因此2个线程仍然可能同一时间内调用compute来计算相同的值,
* 即二者都没有在缓存中找到期望的值,因此都开始计算
* 改进见Memoizer4
*/
if(future == null) {
Callable<V> callable = new Callable<V>() { //真正计算逻辑的调用
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> futureTask = new FutureTask<>(callable); //将计算逻辑封装到Future过程中,FutureTask是实现类
future = cache.putIfAbsent(arg, futureTask); //将计算逻辑的过程放到缓存中(无论计算是否执行完成,key已经在缓存中占了)
if(future == null)
future = futureTask;
futureTask.run();//开启缓存的运算,这里将调用c.compute
}
V v = null;
try {
v = future.get(); //获取结果,若是Future过程已经计算完毕,直接出结果,还没计算完毕则阻塞等待
} catch (ExecutionException e) {
e.printStackTrace();
}
return v;
}
}
当缓存的是Future而不是值时,Memoizer4未解决的问题:
- 缓存污染:如果Future被取消或失败,应将Future从缓存中删除
- 缓存逾期:可以通过使用FutureTask的子类解决。指定逾期时间
- 缓存清理:移除旧的缓存结果,为新的计算结果腾空间或设值新值
程序清单5-20在因式分解Servlet中使用Memoizer4来缓存结果
/**
* 分解因数缓存的终极实现,对比第一章实例
*
*/
@ThreadSafe
class Factorizer implements Servlet {
//1.自定义计算逻辑(因数分解)
Computable<BigInteger,BigInteger[]> c = new Computable<BigInteger, BigInteger[]>() {
@Override
public BigInteger[] compute(BigInteger arg) throws InterruptedException {
//因数分解,返回分解的因数数组
return factor(arg);
}
};
//1.自定义计算逻辑(因数分解)
Computable<BigInteger,BigInteger[]> cache = new Memoizer4<BigInteger, BigInteger[]>(c);
@Override
public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
BigInteger i = extractFromReqest(req);
BigInteger[] factors = factor(i);
encodeIntoResponse(res,cache.compute(i));
}
}