回顾
首先我们回顾一下前面四节所讲的东西
1.线程的基本概念
2.synchronized,底层实现原理,锁升级(无锁-偏向锁-轻量级锁-重量级锁)
3.volatile,线程隔离可见性,禁止指令重排序
4.AtomicXXX
5.各种UC同步框架(ReentrantLock,CountDownLatch,CyclicBarrier,Phaser,ReadWriteLock,Semaphore,Exchange)
6.synchronized 和 各种线程同步框架的ReenrantLock有何不同?
6.1.synchronizec:系统自带,系统自动加锁,自动解锁,不可以出现多个不同的等待队列,默认进行四种锁的升级
6.2.ReentrantLock:需要自动加锁,手动解锁,可以出现多个不同的等待队列,CAS的实现
本章我们补一个LockSupport,然后分析两个面试题,紧接着我会教大家阅读源码的技巧,最后拿AQS作源码分析
2.LockSupport
我们下面以几个小程序案例,对LockSupport进行详细解释,在以前我们要阻塞和唤醒某一个具体的线程有很多的限制,比如
因为wait方法需要释放锁,所以必须在synchronized中使用,否则会抛出异常lllegalMonitorStateException
nofity方法必须在synchronized中使用,并且应该指定对象
synchronized,nofity,wait的对象必须一致,一个synchronized代码块中只能有一个线程用wait和nofity
以上诸多的使用不便,带来了lockSupport的好处
先来看第一个小程序
package com.learn.thread.four;
import com.sun.tools.internal.ws.wsdl.document.soap.SOAPUse;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
public class TestLockSupport {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
// 使用LockSupport进行线程堵塞
if (i == 5) {
LockSupport.park();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
});
thread.start();
}
}
上述代码看起来是比较灵活的,可以手动控制是否上锁,通过LockSupprt.park加锁,有了加锁机制,LockSupport也提供了令牌解锁机制unpark
package com.learn.thread.four;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
public class TestLockSupport2 {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
// 使用LockSupport进行线程堵塞
if (i == 5) {
LockSupport.park();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
});
thread.start();
// 唤醒线程t,注意这里是比park先执行的,当前与释放了一个令牌,当park拿到这个令牌后不阻塞直接放行
LockSupport.unpark(thread);
}
}
注意,这里是一个park对应着一个unpark
package com.learn.thread.four;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class TestLockSupport3 {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
// 使用LockSupport进行线程堵塞
if (i == 5) {
LockSupport.park();
}
// 使用LockSupport进行线程堵塞(前面只会释放一次锁,这里会再次阻塞)
if (i == 8) {
LockSupport.park();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
});
thread.start();
LockSupport.unpark(thread);
}
}
2.1.由以上三个小程序,我们可以总结出以下几点
LockSupport不需要synchronized就可以实现线程的阻塞和唤醒
LockSupport.unpark可以线程LockSupport.park,并且线程不会阻塞
如果一个线程处于等待状态,连续调用了两次park方法,就会使该线程永远无法唤醒
LockSupport中的park 和unpark的实现原理
其实park()和unpark()的实现也是由Unsefa类提供的,而Unsefa类是由C和c++语言完成的,其实原理也是可以理解,它主要是通过一个变量作为标志,变量在0-1之间来回切换,当这个变量大于0的时候线程就获得了“令牌”(unpark的操作),相反park的操作就是将这个变量变成0,用于识别令牌。
2.2.淘宝面试题1
实现一个容器,提供两个方法add和size 写两个线程
线程1:添加十个元素到容器中
线程2:实时监控元素个数,当个数到5个时,线程2给出提示并结束
小程序1
我们先来分析一下小程序1的执行流程:
通过内部的List来new 一个ArraryList,在自定义的add方法直接调用list的add方法,在自定义的size方法调用list的size方法。然后小程序初始化这个List,启动线程t1来做一个循环,每次循环都添加一个对象,加一个打印,然后间隔一秒,在t2线程写一个while循环,实时监控集合中对象的变换,如果数量达到5就结束t2的线程。
package com.learn.thread.four.list;
import java.util.ArrayList;
import java.util.List;
public class TestListAdd1 {
List<Object> list = new ArrayList<Object>(16);
public void add(Object object) {
this.list.add(object);
}
public int size() {
return this.list.size();
}
public static void main(String[] args) {
TestListAdd1 testListAdd1 = new TestListAdd1();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
testListAdd1.add(new Object());
System.out.println(i);
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
// t2 线程监听集合的数量,发现获取不到数量为5的时候,原因是线程之间不可见的原因,
// 因为t1 添加完对象之后,肯定会更新size 方法,但是在更新size的过程中,t2线程已经开始读了size方法,造成了数据不一致
new Thread(() -> {
while (true) {
if (testListAdd1.size() == 5 ) {
break;
}
}
System.out.println("t2 结束");
},"t2").start();
}
}
结论:
方法并没有按预期的执行,这是因为,ArraryList的Size方法肯定要更新的,但是还没有更新完,线程2就读了,所以这时候永远不会检测到List的长度为5了,原因就是线程之间的不可见因素
小程序2
package com.learn.thread.four.list;
import java.util.ArrayList;
import java.util.List;
public class TestListAdd2 {
// 这里用volatile修饰引用类型,发现并没有做到线程的之间的可见性,因为引用对象指向的是一个地址,
// 如果这个对象的内部值被改变了,是无法被观察到的
volatile List<Object> list = new ArrayList<Object>(16);
public void add(Object object) {
this.list.add(object);
}
public int size() {
return this.list.size();
}
public static void main(String[] args) {
TestListAdd2 testListAdd2 = new TestListAdd2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
testListAdd2.add(new Object());
System.out.println(i);
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
// t2 线程监听集合的数量,发现获取不到数量为5的时候,原因是线程之间不可见的原因,
new Thread(() -> {
while (true) {
if (testListAdd2.size() == 5 ) {
break;
}
}
System.out.println("t2 结束");
},"t2").start();
}
}
结论:
小程序2在小程序1的基础上加上了线程可见volitate修饰,但是还是无法满足需求,这是因为volitate要去修饰普通的值,不要去修饰引用值,因为修饰引用类型,这个引用对象指向的是另外一个new出来的对象,如果这个对象里边的成员对象改变了,是无法被观察到的。
下面自行写了个测试修饰引用类型的demo
package com.learn.thread.four.list;
import com.learn.thread.first.T;
public class Test {
private volatile static int a = 0;
// private volatile static Integer a = 0;
public void test() {
a++;
System.out.println(a);
}
public static void main(String[] args) {
Thread[] threads = new Thread[100];
Test test = new Test();
for (int i = threads.length - 1; i >= 0; i--) {
threads[i] = new Thread(() -> test.test());
threads[i].start();
}
}
}
小程序3
package com.learn.thread.four.list;
import java.util.ArrayList;
import java.util.List;
/**
* 用对象锁
* wait,和nofity实现
*/
public class TestListAdd3 {
private List<Object> list = new ArrayList<Object>(16);
public void add(Object object) {
this.list.add(object);
}
public int size() {
return this.list.size();
}
public static void main(String[] args) {
TestListAdd3 testListAdd3 = new TestListAdd3();
final Object lock = new Object();
// t2 要早于t1启动
new Thread(() -> {
synchronized (lock) {
System.out.println("t2 启动");
if (testListAdd3.size() != 5 ) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 结束");
}
},"t2").start();
new Thread(() -> {
synchronized (lock) {
System.out.println("t1 启动");
for (int i = 0; i < 10; i++) {
testListAdd3.add(new Object());
System.out.println(i);
if (i == 5) {
// 此处有坑
// 特别注意,nofity并不释放当前锁,t1 会继续往下走,等t1 执行完了, t2 才会拿到这把对象锁
lock.notify();
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
}
}
结论:
小程序3 也是行不通,原因是nofity方法不释放锁,当t1线程调用nofity方法,并没有释放当前锁,所以t1还是会继续运行,等待t1执行完毕,t2才会继续执行,这个时候当前List不只有5个了。
小程序4
package com.learn.thread.four.list;
import java.util.ArrayList;
import java.util.List;
public class TestListAdd4 {
private List<Object> list = new ArrayList<Object>(16);
public void add(Object object) {
this.list.add(object);
}
public int size() {
return this.list.size();
}
public static void main(String[] args) {
TestListAdd4 testListAdd4 = new TestListAdd4();
final Object lock = new Object();
// t2 要早于t1启动
new Thread(() -> {
synchronized (lock) {
System.out.println("t2 启动");
if (testListAdd4.size() != 5 ) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 结束");
// 释放当前锁,唤醒t1
lock.notify();
}
},"t2").start();
new Thread(() -> {
synchronized (lock) {
System.out.println("t1 启动");
for (int i = 0; i < 10; i++) {
testListAdd4.add(new Object());
System.out.println(i);
if (i == 5) {
// 此处有坑
// 特别注意,nofity并不释放当前锁,t1 会继续往下走,等t1 执行完了, t2 才会拿到这把对象锁
// 所以下面加多一个操作,让此线程等待,阻塞此线程,让t2 去执行
lock.notify();
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
}
}
结论
小程序4基本上满足的需求,在小程序3的基础上了,nofity之后让本线程等待,让给t2执行,然后t2执行完毕,释放nofity,回到t1执行。
小程序5
package com.learn.thread.four.list;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* 这里使用CountDown实现
*/
public class TestListAdd5 {
private List<Object> list = new ArrayList<Object>(16);
public void add(Object object) {
this.list.add(object);
}
public int size() {
return this.list.size();
}
public static void main(String[] args) {
TestListAdd5 testListAdd5 = new TestListAdd5();
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() -> {
System.out.println("t2 启动");
if (testListAdd5.size() != 5) {
try {
countDownLatch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("t2 结束");
}, "t2").start();
new Thread(() -> {
System.out.println("t1 启动");
for (int i =0; i < 10; i++) {
testListAdd5.add(new Object());
System.out.println(i);
if (testListAdd5.size() == 5) {
// 暂停t1 线程, 打开门阀
countDownLatch.countDown();
}
// 如果这段代码去除了,就不会给t2时间去执行,所以这里需要等待一会
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1 ").start();
}
}
结论:
从执行结果来看,并没有多大的问题,但是如果我们把休眠一秒的代码去掉,会发现,执行结果不正确,这事因为t1线程对象增加到5个时,t2的线程门阀确实被打开了,但是t1线程马上又会接着执行,t1之前是休眠1秒,给t2线程执行时间,如果注释掉这段代码,t2就没有机会去实时监控了。
小程序6
package com.learn.thread.four.list;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class TestListAdd6 {
private List<Object> list = new ArrayList<Object>(16);
public void add(Object object) {
this.list.add(object);
}
public int size() {
return this.list.size();
}
public static void main(String[] args) throws InterruptedException {
TestListAdd6 testListAdd6 = new TestListAdd6();
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() -> {
System.out.println("t2 启动");
if (testListAdd6.size() != 5) {
try {
countDownLatch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("t2 结束");
}, "t2").start();
new Thread(() -> {
System.out.println("t1 启动");
for (int i =0; i < 10; i++) {
testListAdd6.add(new Object());
System.out.println(i);
if (testListAdd6.size() == 5) {
// 打开门阀
countDownLatch.countDown();
// 等待
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
}
结论
这段代码很好理解,就是在t1线程打开门阀的时候,给自己再加一个门阀,但是这个线程睡眠代码,还是不能去除。
小程序7
package com.learn.thread.four.list;
import com.learn.thread.first.T;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
public class TestListAdd7 {
private List<Object> list = new ArrayList<Object>(16);
public void add(Object object) {
this.list.add(object);
}
public int size() {
return this.list.size();
}
static Thread t2 = null;
static Thread t1 = null;
public static void main(String[] args) {
TestListAdd7 testListAdd7 = new TestListAdd7();
t1 = new Thread(() -> {
System.out.println("t1 启动了");
for (int i =0; i < 10; i++) {
testListAdd7.add(new Object());
System.out.println(i);
if (testListAdd7.size() == 5) {
LockSupport.unpark(t2);
LockSupport.park();
}
}
},"t1");
t2 = new Thread(() -> {
System.out.println("t2 启动了");
if (testListAdd7.size() != 5) {
LockSupport.park();
}
System.out.println("t2 结束");
LockSupport.unpark(t1);
},"t2");
t2.start();
t1.start();
}
}
结论:
这次跟之前也是大同小异,只不过锁的方式不一样罢了
小程序8
package com.learn.thread.four.list;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
public class TestListAdd9 {
private List<Object> list = new ArrayList<Object>(16);
public void add(Object object) {
this.list.add(object);
}
public int size() {
return this.list.size();
}
static Thread t1,t2 = null;
public static void main(String[] args) {
TestListAdd8 testListAdd8 = new TestListAdd8();
Semaphore semaphore = new Semaphore(1);
t1 = new Thread(() -> {
// 保证只有一个线程执行
try {
semaphore.acquire();
for (int i = 0; i < 5; i++) {
testListAdd8.add(new Object());
System.out.println(i);
}
// 释放当前线程,让别的线程可以加入
semaphore.release();
t2.start();
t2.join();
// 此线程继续执行
semaphore.acquire();
for (int i = 5; i < 10; i++) {
testListAdd8.add(new Object());
System.out.println(i);
}
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t2 = new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2 结束");
semaphore.release();
});
t1.start();
}
}
结论
本次是一次牵强的方法,用Semaphre限制每次只有一个线程执行,当t1增加到4的时候,释放当前线程,并且将cpu交给t2执行,这时候t2也是保证只有一个线程执行,执行完后立马释放,t1调用join完之后可以继续获得Semaphre的锁,然后继续增加对象,最后释放线程。
针对以上8个小程序,我们分别用了volitate,wait,nofity,Semphore,CountDownLatch,LockSupport,其中wait和nofity要牢牢掌握
2.3.淘宝面试题2
写一个固定容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程阻塞调用
小程序1
package com.learn.thread.four.prodducer;
import java.util.LinkedList;
public class TestProducer<T> {
private final LinkedList<T> list = new LinkedList<T>();
public final int MAX = 10;
private int count = 0;
public synchronized void put(T t) {
// 想想这里什么用while
// 这是因为当linkedList集合中的个数达到最大值得时候,if判断了集合的大小等于MAX
// 调用了wait方法,它不会再去判断一次,而是继续往下走,假如wait以后,有别的生产者线程添加数据,那么,这里就没有
// 再次判断,又添加了一次,造成了数据错误
while (list.size() == MAX) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(t);
++ count;
System.out.println("生产者生产 " + count);
// 唤醒所有在队列中等待的线程
this.notifyAll();
}
public synchronized T get() {
T t = null;
while (list.size() == 0) {
try {
this.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "消费");
t = list.removeFirst();
count --;
System.out.println(count);
this.notifyAll();
return t;
}
public static void main(String[] args) {
TestProducer<String> testProducer = new TestProducer<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
System.out.println(testProducer.get());
}
}, "c" + i).start();
}
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) {
testProducer.put(Thread.currentThread().getName() + "" + j);
}
}, "p" + i).start();
}
}
}
这里有一个很遗憾的问题,那就是nofityAlll,如果生产者生产完,是有可能唤醒一个还是生产者的线程,所以这里可以做一个优化
小程序2
package com.learn.thread.four.prodducer;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestProducer2<T> {
private final LinkedList<T> list = new LinkedList<T>();
public final int MAX = 10;
private int count = 0;
private ReentrantLock lock = new ReentrantLock();
Condition producer = lock.newCondition();
Condition comsumer = lock.newCondition();
public void put(T t) {
// 想想这里什么用while
// 这是因为当linkedList集合中的个数达到最大值得时候,if判断了集合的大小等于MAX
// 调用了wait方法,它不会再去判断一次,而是继续往下走,假如wait以后,有别的生产者线程添加数据,那么,这里就没有
// 再次判断,又添加了一次,造成了数据错误
try {
lock.lock();
while (list.size() == MAX) {
try {
producer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(t);
++ count;
System.out.println("生产者生产 " + count);
// 唤醒所有消费者等待的线程
comsumer.signalAll();
}catch (Exception ex) {
ex.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
T t = null;
try {
lock.lock();
while (list.size() == 0) {
try {
comsumer.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "消费");
t = list.removeFirst();
count --;
System.out.println(count);
producer.signalAll();
}catch (Exception ex) {
ex.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) {
TestProducer2<String> testProducer = new TestProducer2<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
System.out.println(testProducer.get());
}
}, "c" + i).start();
}
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) {
testProducer.put(Thread.currentThread().getName() + "" + j);
}
}, "p" + i).start();
}
}
}
这里ReentrantLock的优势就体现出来了,它可以有多种情况Condition,在put方法了达到了峰值就是生产者producer.await,反之就是comsumer.await
ReentrantLock的本质就是synchronzied里调用wait和nofity的时候,它只有一个等待队列,但是用了Condition就是多个等待队列。当我们使用producer.await的时候,就是进入了producer的等待队列,producer.signalAll指的是唤醒producer这个等待队列的线程。