请带着如下问题阅读本文。
1.什么是原子操作?在Java Concurrency API中有哪些原子类(atomic classes)?
2.什么是AQS?
3.什么是Callable和Future?
4.什么是FutureTask?
5.Semaphore有什么作用?
6.ReentrantReadWriteLock读写锁的使用?
7.CyclicBarrier和CountDownLatch的用法及区别?
8.LockSupport工具?
9.Condition接口及其实现原理?
10.Fork/Join框架的理解?
Atomic
原子操作是指一个不受其他操作影响的操作任务单元。原子操作是在多线程环境下避免数据不一致必须的手段。
java.util.concurrent.atomic包提供了int和long类型的装类,它们可以自动的保证对于他们的操作是原子的并且不需要使用同步。
AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
通过原子的方式更新数组里的某个元素,Atomic包提供了以3类
AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
如果需原子地更新某个类里的某个字段时,就需要使用原子更新字段类,Atomic包提供了以下3个类进行原子字段更新。
AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
public class AtomicIntegerFieldUpdaterTest {
// 创建原子更新器,并设置需要更新的对象类和对象的属性
private static AtomicIntegerFieldUpdater<User> a =
AtomicIntegerFieldUpdater.newUpdater(User.class, "old");
public static void main(String[] args) throws InterruptedException {
// 设置柯南的年龄是10岁
User conan = new User("conan", 10);
// 柯南长了一岁,但是仍然会输出旧的年龄
System.out.println(a.getAndIncrement(conan));
// 输出柯南现在的年龄
System.out.println(a.get(conan));
}
public static class User {
private String name;
public volatile int old;
public User(String name, int old) {
this.name = name;
this.old = old;
}
public String getName() {
return name;
}
public int getOld() {
return old;
}
}
}
AQS
为实现依赖于先进先出等待队列的阻塞锁和相关同步器提供一个框架。
AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus。
AQS没有锁之类的概念,它有个state变量,是个int类型,在不同场合有着不同含义。
例如ReentrantLocky用它表示线程重入锁的次数,Semaphore用它表示剩余的许可数量,FutureTask用它表示任务的状态。对state变量值的更新都采用CAS操作保证更新操作的原子性。
使用
为了将此类用作同步器的基础,需要适当地重新定义以下方法,这是通过使用 getState()
、setState(int)
和/或 compareAndSetState(int, int)
方法来检查和/或修改同步状态来实现的:
默认情况下,每个方法都抛出 UnsupportedOperationException
。这些方法的实现在内部必须是线程安全的,通常应该很短并且不被阻塞。定义这些方法是使用此类的 唯一受支持的方式。其他所有方法都被声明为 <tt style="color: rgb(0, 0, 0); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">final</tt>,因为它们无法是各不相同的。
Callable,Future,FutureTask
Callable 和 Future 是比较有趣的一对组合。当我们需要获取线程的执行结果时,就需要用到它们。Callable用于产生结果,Future用于获取结果。
Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法去在线程池中执行Callable内的任务。由于Callable任务是并行的,必须等待它返回的结果。java.util.concurrent.Future对象解决了这个问题。
在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法,等待Callable结束并获取它的执行结果。
代码示例
Callable 是一个接口,它只包含一个call()方法。Callable是一个返回结果并且可能抛出异常的任务。
为了便于理解,我们可以将Callable比作一个Runnable接口,而Callable的call()方法则类似于Runnable的run()方法。
什么是FutureTask?
FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。
常用并发工具
CountDownLatch
CyclicBarrier
Semaphore
Exchanger
CountDownLatch
countDownLatch允许一个或多个线程等待其他线程完成操作.比如说有三个线程分别是老二,老大,老爸,这三个线程必须是老二吃好了,老大吃,老大吃完了,老爸吃,在20年钱,农村家里穷,一般有好吃的都是先留个最小的,然后才给其他兄弟姐妹吃,都不吃了,才由我们的父母吃,所以父母都不容易了,为了儿女,虽然是多个线程但是确实线性的,
这里要用JOIN。
如果是三个男的可以同时吃,吃完之后,再由女的吃。就可以用COUNTDOWNLATCH
计时器必须大于等于0,只是等于0的时候,计时器就是0,调用await()方法时,不会阻塞当前线程,CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部 计数器的值,不可逆性.
Semaphore
semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源.这个跟队列有点像,画图理解更直观,
Exchane
Exchanger主要用于二个线程之间交换数据,注意,只能是2个线程,如果有一个线程没执行exchange()方法,则会一直等待,线程就处于阻塞状态了!如果怕一直等待,可以设置时间:exchange()有一个重载的方法.
class Man extends Thread {
Exchanger<Integer> exchanger = null;
public Man(Exchanger<Integer> exchanger) {
super();
this.exchanger = exchanger;
}
@Override
public void run() {
Random rand = new Random();
int money = 0;
for(int i=0;i<4;i++){
money+=100000;//年薪在10万以内
try {
exchanger.exchange(money);//存钱
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 女人
*/
class Girl extends Thread {
Exchanger<Integer> exchanger = null;
int money = 0;
public Girl(Exchanger<Integer> exchanger) {
super();
this.exchanger = exchanger;
}
@Override
public void run() {
for(int i=0;i<4;i++){
try {
money = exchanger.exchange(money) ;
System.out.println(money>300000?"亲爱的"+money+"万我们可以结婚了":money+"块这么少,"+"臭屌丝活该单身,还不去赚钱娶老婆");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
CyclicBarrier
CyclicBarrier,让一组线程到达一个同步点后再一起继续运行,在其中任意一个线程未达到同步点,其他到达的线程均会被阻塞。
大家一致的意见都是CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供reset功能,可以多次使用。
对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
Condition
在java.util.concurrent包中,有两个很特殊的工具类,Condition和ReentrantLock,使用过的人都知道,ReentrantLock(重入锁)是jdk的concurrent包提供的一种独占锁的实现
我们知道在线程的同步时可以使一个线程阻塞而等待一个信号,同时放弃锁使其他线程可以能竞争到锁
在synchronized中我们可以使用Object的wait()和notify方法实现这种等待和唤醒
但是在Lock中怎么实现这种wait和notify呢?
答案是Condition,学习Condition主要是为了方便以后学习blockqueue和concurrenthashmap的源码,同时也进一步理解ReentrantLock。
正如WAIT 是需要用在SYNCHRONIZED 里面。CONDITION 也是需要和锁配合来用
结合一段代码来理解原理
Condition自己也维护了一个等待队列,该队列的作用是维护一个等待signal信号的队列,两个队列(AQS还有一个同步队列)的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的
线程1调用reentrantLock.lock时,线程被加入到AQS的同步队列中。
线程1调用await方法被调用时,该线程从AQS同步队列中移除,对应操作是锁的释放。
接着马上被加入到Condition的等待队列中,以为着该线程需要signal信号。
线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的同步队列中。
线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的同步队列中。 注意,这个时候,线程1 并没有被唤醒。
signal方法执行完毕,线程2调用reentrantLock.unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1回复执行。
直到释放所整个过程执行完毕。
可以看到,整个协作过程是靠结点在AQS的同步队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的同步队列中来实现的唤醒操作。
LockSupport
LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞。java锁和同步器框架的核心 AQS: AbstractQueuedSynchronizer,就是通过调用 LockSupport .park()和 LockSupport .unpark()实现线程的阻塞和唤醒 的。
LockSupport 很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继 续 执行;如果许可已经被占用,当前线 程阻塞,等待获取许可。
相比较SUSPEND,他不会引起线程冻结。假设一个线程先UNPARK了你,你再PARK,你不会被挂起。
底层是调用UNSAFE.UNPARK来做。
Fork/Join 框架理解
Fork/Join是什么
Oracle的官方给出的定义是:Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。
我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。
比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。
工作窃取算法是指线程从其他任务队列中窃取任务执行(可能你会很诧异,这个算法有什么用。待会你就知道了)。考虑下面这种场景:有一个很大的计算任务,为了减少线程的竞争,会将这些大任务切分为小任务并分在不同的队列等待执行,然后为每个任务队列创建一个线程执行队列的任务。那么问题来了,有的线程可能很快就执行完了,而其他线程还有任务没执行完,执行完的线程与其空闲下来不如帮助其他线程执行任务,这样也能加快执行进程。所以,执行完的空闲线程从其他队列的尾部窃取任务执行,而被窃取任务的线程则从队列的头部取任务执行(这里使用了双端队列,既不影响被窃取任务的执行过程又能加快执行进度)。
Fork/Join实现了ExecutorService,所以它的任务也需要放在线程池中执行。它的不同在于它使用了工作窃取算法,空闲的线程可以从满负荷的线程中窃取任务来帮忙执行。