Cyclicbarrier也是处理多线程并发的手段之一,就是多个线程同时执行,要等到彼此都执行完后再进行下一步操作,使用方式很简单,网上例子很多,这里我贴一个:
https://www.baeldung.com/java-cyclic-barrier
我们这里是要对这个类的实现源码进行解读,看看它是怎么实现的,首先看看Cyclicbarrier有哪些功能(方法,只看public的,因为只有public才是对外的功能,并且忽略构造函数):
await:这个是最最主要的方法,功能是等到所有线程(代码中变量为parties)执行完毕。
isBroken: 次要的方法,检查该Cyclicbarrier的状态是否破损
reset:特色方法,用于重置整个Cyclicbarrier的状态,因为Cyclicbarrier会维持一些变量,这些变量会在对象方法执行过程中有变化,这个方法会把这些变量重置到初始化状态,这样这个Cyclicbarrier对象就可以重复使用了。
getNumberWaiting:返回正在等待的线程的数量,就是有多少个线程已经完成了操作,正在等待所有线程执行完毕。
好哒,明眼人都看得出来await这个方法是关键,看懂了这个方法就完全搞清楚这个类啦。
await是多态方法有两种形式,有参数和没参数的:
public intawait()throwsInterruptedException,BrokenBarrierException {
try{
returndowait(false,0L);
}catch(TimeoutException toe) {
throw newError(toe);// cannot happen
}
}
和
public intawait(longtimeout,TimeUnit unit)
throwsInterruptedException,
BrokenBarrierException,
TimeoutException {
returndowait(true,unit.toNanos(timeout));
}
第二种其实就是给await加个超时,就是说在某些情况下线程无法等到所有其他线程都执行完,因为某些线程可能不能等太久,于是就加个超时,常规操作嘛,可以看到它们其实都调用的dowait方法:
private intdowait(booleantimed, longnanos)
throwsInterruptedException,BrokenBarrierException,
TimeoutException {
finalReentrantLock lock =this.lock;
lock.lock();
try{
finalGeneration g =generation;
if(g.broken)
throw newBrokenBarrierException();
if(Thread.interrupted()) {
breakBarrier();
throw newInterruptedException();
}
intindex = --count;
if(index ==0) {// tripped
booleanranAction =false;
try{
finalRunnable command =barrierCommand;
if(command !=null)
command.run();
ranAction =true;
nextGeneration();
return0;
}finally{
if(!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for(;;) {
try{
if(!timed)
trip.await();
else if(nanos >0L)
nanos =trip.awaitNanos(nanos);
}catch(InterruptedException ie) {
if(g ==generation&& ! g.broken) {
breakBarrier();
throwie;
}else{
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if(g.broken)
throw newBrokenBarrierException();
if(g !=generation)
returnindex;
if(timed && nanos <=0L) {
breakBarrier();
throw newTimeoutException();
}
}
}finally{
lock.unlock();
}
}
好几十行哈,看着挺吓人,但其实里面的异常和错误处理都占了一大半=。=,我们平时写代码异常处理太少的,站出来反思一下哈,代码的鲁棒性肯定不强。。。。
我来手动剔除所有的异常和错误处理,来看清它主要做的事情:
private intdowait(booleantimed, longnanos)
throwsInterruptedException,BrokenBarrierException,
TimeoutException {
finalReentrantLock lock =this.lock;
lock.lock();
try{
finalGeneration g = generation;
intindex = --count;
if(index ==0) {// tripped
finalRunnable command = barrierCommand;
if(command !=null)
command.run();
nextGeneration();
return0;
}
// loop until tripped, broken, interrupted, or timed out
for(; ;) {
if(!timed)
trip.await();
else if(nanos >0L)
nanos = trip.awaitNanos(nanos);
}
}finally{
lock.unlock();
}
}
是不是短了一大半(当然那些异常错误处理都是必须的)?ok, 首先我们线程第一步就是要获取CyclicBarrier对象的锁,因为线程会涉及到对CyclicBarrier对象里的变量进行操作,这里是用锁机制来保证同步。
忘了把CyclicBarrier里的所有变量贴出来了:
private static classGeneration {
booleanbroken=false;
}
/** The lock for guarding barrier entry */
private finalReentrantLocklock=newReentrantLock();
/** Condition to wait on until tripped */
private finalConditiontrip=lock.newCondition();
/** The number of parties */
private final intparties;
/* The command to run when tripped */
private finalRunnablebarrierCommand;
/** The current generation */
privateGenerationgeneration=newGeneration();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private intcount;
上面说的锁就是ReentrantLock,我有一篇专门写ReentrantLock源码解析的文章:
https://www.jianshu.com/p/b43c9f62ceb1
好的, 获取了锁以后,我们会去判断当前线程是不是最后一个等待的线程,如果只剩它一个了的话,就会停止了,然后执行传到CyclicBarrier对象里的线程,并且开启一个新的循环(其实也就是把CyclicBarrier对象的所有参数设置为初始化时的状态)。然后就完事了~
错误处理就几句话简单带过,就是当有线程被中断后,其他线程就会停止等待了,CyclicBarrier对象也会被重置,不用担心永久等待的问题。
所以对比CountDownLatch, CyclicBarrier的好处就是CyclicBarrier对象可以复用~所以在需要复用的情况下(目前我没遇到过)就用CyclicBarrier吧~