核心概述:本篇我们将继续学习Java中的多线程,其中有多线程的等待唤醒机制、Condition接口的使用、Java中的线程池、Timer定时器以及ConcurrentHashMap的使用。
第一章:等待唤醒机制
1.1-线程间的通信(了解)
什么是线程之间的通信呢?
就是多个线程在处理同一个资源,但是处理的动作(线程的任务)却不相同。
比如:线程A用来生成包子的,线程B用来吃包子的,包子可以理解为同一资源,线程A与线程B处理的动作,一个是生产,一个是消费,那么线程A与线程B之间就完成了通信,其实就是一种协作关系。
为什么要处理线程间的通信?
多个线程并发执行时, 在默认情况下CPU是随机切换线程的,当我们需要多个线程来共同完成一件任务,并且我们 希望他们有规律的执行, 那么多线程之间需要一些协调通信,以此来帮我们达到多线程共同操作一份数据。
如何保证线程间通信有效利用资源?
多个线程在处理同一个资源,并且任务不同时,需要线程通信来帮助解决线程之间对同一个变量的使用或操作。 就是多个线程在操作同一份数据时, 避免对同一共享变量的争夺。也就是我们需要通过一定的手段使各个线程能有效的利用资源。而这种手段即—— 等待唤醒机制。
1.2-什么是等待唤醒机制(了解)
这是多个线程间的一种协作机制。谈到线程我们经常想到的是线程间的竞争(race),比如去争夺锁,但这并不是故事的全部,线程间也会有协作机制。就好比在公司里你和你的同事们,你们可能存在在晋升时的竞争,但更多时候你们更多是一起合作以完成某些任务。
就是在一个线程进行了规定操作后,就进入等待状态(wait()), 等待其他线程执行完他们的指定代码过后 再将其唤醒(notify());在有多个线程进行等待时, 如果需要,可以使用 notifyAll()来唤醒所有的等待线程。
wait/notify 就是线程间的一种协作机制。
1.3-等待唤醒相关方法(重要)
线程等待和唤醒的方法定义在java.lang.Object
类中。
wait方法
当调用wait方法后,线程不再活动,不再参与调度,进入 wait set 中,因此不会浪费 CPU 资源,也不会去竞争锁了,这时的线程状态即是 WAITING。它还要等着别的线程执行一个特别的动作,也即是“通知(notify)”在这个对象上等待的线程从wait set 中释放出来,重新进入到调度队列(ready queue)中。
notify方法
当调用notify方法后,则选取所通知对象的 wait set 中的一个线程释放;例如,餐馆有空位置后,等候就餐最久的顾客最先入座。
notifyAll方法
当调用notifyAll方法后,则释放所通知对象的 wait set 上的全部线程。
注意事项
注意事项1:
哪怕只通知了一个等待的线程,被通知线程也不能立即恢复执行,因为它当初中断的地方是在同步块内,而此刻它已经不持有锁,所以她需要再次尝试去获取锁(很可能面临其它线程的竞争),成功后才能在当初调用 wait 方法之后的地方恢复执行。
总而言之,如果能获取锁,线程就从 WAITING 状态变成 RUNNABLE 状态;否则,从 wait set 出来,又进入 entry set,线程就从 WAITING 状态又变成 BLOCKED 状态
注意事项2:
- wait方法与notify方法必须要由同一个锁对象调用。因为:对应的锁对象可以通过notify唤醒使用同一个锁对象调用的wait方法后的线程。
- wait方法与notify方法是属于Object类的方法的。因为:锁对象可以是任意对象,而任意对象的所属类都是继承了Object类的。
- wait方法与notify方法必须要在同步代码块或者是同步函数中使用。因为:必须要通过锁对象调用这2个方 法。
1.4-案例(练习)
等待唤醒机制其实就是经典的“生产者与消费者”的问题。
就拿生产包子消费包子来说等待唤醒机制如何有效利用资源
需求
定义一个变量,包子铺线程完成生产包子,包子进行++操作;吃货线程完成购买包子,包子变量打印出来。
- 当包子没有时(包子状态为false),吃货线程等待。
- 包子铺线程生产包子(即包子状态为true),并通知吃货线程(解除吃货的等待状态)。
- 保证线程安全,必须生产一个消费一个,不能同时生产或者消费多个。
代码
包子铺类
public class BaoZiPu {
private int baoZiCount;
//标志位变量
//当包子没有时(包子状态为false),吃货线程等待。
//包子铺线程生产包子(即包子状态为true),并通知吃货线程(解除吃货的等待状态)。
private boolean flag;
public void setFlag(boolean flag){
this.flag = flag;
}
public boolean getFlag(){
return flag;
}
//消费者调用方法,变量输出
public void get(){
System.out.println("消费第"+baoZiCount+"个包子");
}
//生产者调用方法,变量++
public void set(){
baoZiCount++;
System.out.println("生产第"+baoZiCount+"个包子");
}
}
生产者类
public class Product implements Runnable{
private BaoZiPu baoZiPu;
public Product(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
synchronized (baoZiPu) {
//生产者线程判断标志位变量,==true,已经生产还没有消费
if(baoZiPu.getFlag() == true){
try {
//线程等待
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生产一个
baoZiPu.set();
//修改标志位
baoZiPu.setFlag(true);
//唤醒对方线程
notify();
}
}
}
}
消费者类
public class Customer implements Runnable {
private BaoZiPu baoZiPu;
public Customer(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
synchronized (baoZiPu) {
//消费者线程判断标志位,==false,没有生产
if(baoZiPu.getFlag()==false) {
try {
//线程等待
wait();
} catch (InterruptedException ex) {
}
}
//调用消费方法
baoZiPu.get();
//修改标志位
baoZiPu.setFlag(false);
//唤醒对方线程
notify();
}
}
}
}
测试类
public class Test{
public static void main(String[] args) {
BaoZiPu baoZiPu = new BaoZiPu();
Product product = new Product(baoZiPu);
Customer customer = new Customer(baoZiPu);
new Thread(product).start();
new Thread(customer).start();
}
}
执行结果
异常分析
- 程序出现无效的监视器状态异常。
- wait()或者notify()方法会抛出此异常。
- 程序中,wait()或者notify()方法的调用者是this对象。
- 而this对象在同步中并不是锁对象,只有作为锁的对象才能调用wait()或者notify()方法。
- 而锁对象是生产者和消费者共享的包子铺对象。
代码改造
生产者类
public class Product implements Runnable{
private BaoZiPu baoZiPu;
public Product(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
synchronized (baoZiPu) {
//生产者线程判断标志位变量,==true,已经生产还没有消费
if(baoZiPu.getFlag() == true){
try {
//线程等待
baoZiPu.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生产一个
baoZiPu.set();
//修改标志位
baoZiPu.setFlag(true);
//唤醒对方线程
baoZiPu.notify();
}
}
}
}
消费者类
public class Customer implements Runnable {
private BaoZiPu baoZiPu;
public Customer(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
synchronized (baoZiPu) {
//消费者线程判断标志位,==false,没有生产
if(baoZiPu.getFlag()==false) {
try {
//线程等待
baoZiPu.wait();
} catch (InterruptedException ex) {
}
}
//调用消费方法
baoZiPu.get();
//修改标志位
baoZiPu.setFlag(false);
//唤醒对方线程
baoZiPu.notify();
}
}
}
}
代码优化
通过线程等待与唤醒,实现了生产者与消费者案例,但是代码维护性差,阅读性差,使用同步方法进行代码的优化。在包子铺类中的get(),set()方法进行同步方法的改进。
注意:一旦方法同步后,this就是锁对象。
包子铺类:变量flag只在类中使用,因此可以去掉get/set方法。
包子铺类
public class BaoZiPu {
private int baoZiCount;
//标志位变量
//当包子没有时(包子状态为false),吃货线程等待。
//包子铺线程生产包子(即包子状态为true),并通知吃货线程(解除吃货的等待状态)。
private boolean flag;
//消费者调用方法,使用同步
public synchronized void get(){
//判断标志位 ==false,没有生产,线程等待
if (flag == false)
try {
this.wait();
}catch (InterruptedException ex){}
System.out.println("消费第"+baoZiCount+"个包子");
//修改标志位
flag = false;
//唤醒对方线程
this.notify();
}
//生产者调用方法,变量++,使用同步
public synchronized void set(){
//判断标志位,==true,没有消费,线程等待
if(flag == true)
try {
this.wait();
}catch (InterruptedException ex){}
baoZiCount++;
System.out.println("生产第"+baoZiCount+"个包子");
//修改标志位
flag = true;
//唤醒对方线程
this.notify();
}
}
生产者类
public class Product implements Runnable{
private BaoZiPu baoZiPu;
public Product(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
baoZiPu.set();
}
}
}
消费者类
public class Customer implements Runnable {
private BaoZiPu baoZiPu;
public Customer(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
baoZiPu.get();
}
}
}
1.5-sleep()方法和wait()方法的区别(了解)
- sleep()是Thread类静态方法,不需要对象锁。
- wait()方法是Object类的方法,被锁对象调用,而且只能出现在同步中。
- 执行sleep()方法的线程不会释放同步锁。
- 执行wait()方法的线程要释放同步锁,被唤醒后还需获取锁才能执行。
1.6-多生产者多消费者(了解)
概述
上一练习中,我们实现了生产者和消费者案例,但是如果我们开启多个生产者线程和多个生产者线程会发生什么现象呢,线程还会安全吗?
线程安全原因分析
当开启了多个线程后,数据出现了安全问题。问题就出现在等待和唤醒环节。我们将线程分成了生产者和消费者两个部分,需要生产者线程唤醒消费者线程,而消费者线程要唤醒生产者线程。但是线程的唤醒是按照队列形式进行,先等待的会先被唤醒。很可能出现生产者线程又唤醒了生产者线程,消费者线程唤醒了消费者线程。因此我们需要将线程全部唤醒,使用notifyAll()方法。
全部唤醒后,线程依然不安全,是因为线程判断完标志位后就会等待,当被唤醒后,就不会再判断标志位了,我们必须让线程在唤醒后,还要继续判断标志位,允许生存才能生产,不运行生产就要继续等待。
改造代码实现多生产和多消费
包子铺类
public class BaoZiPu {
private int baoZiCount;
//标志位变量
//当包子没有时(包子状态为false),吃货线程等待。
//包子铺线程生产包子(即包子状态为true),并通知吃货线程(解除吃货的等待状态)。
private boolean flag;
//消费者调用方法,使用同步
public synchronized void get(){
//判断标志位 ==false,没有生产,线程等待
while (flag == false)
try {
this.wait();
}catch (InterruptedException ex){}
System.out.println("消费第"+baoZiCount+"个包子");
//修改标志位
flag = false;
//唤醒对方线程
this.notifyAll();
}
//生产者调用方法,变量++,使用同步
public synchronized void set(){
//判断标志位,==true,没有消费,线程等待
while(flag == true)
try {
this.wait();
}catch (InterruptedException ex){}
baoZiCount++;
System.out.println("生产第"+baoZiCount+"个包子");
//修改标志位
flag = true;
//唤醒对方线程
this.notifyAll();
}
}
第二章:Condition接口
2.1-等待唤醒的弊端(了解)
多生产与多消费案例中,我们使用了线程通信的相关方法wait()和notify(),notifyAll()。
public final native void wait(long timeout) throws InterruptedException
public final native void notify()
public final native void notifyAll()
以上三个方法都是本地方法,要和操作系统进行交互,因此线程等待唤醒需要消耗系统资源,程序效率降低。另外我们一次唤醒所有的线程,也会浪费很多资源,为了解决这些弊端,JDK1.5版本的时候出现了Lock接口和Condition接口。
2.2-Condition接口(重点)
介绍
Condition
将 Object
监视器方法(wait
、notify
和 notifyAll
)分解成截然不同的对象,以便通过将这些对象与任意 Lock
实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock
替代了synchronized
方法和语句的使用,Condition
替代了Object
监视器方法的使用。
获取Condition对象
Lock接口的方法newCondition()获取
public Condition newCondition()
Condition对象常用方法
Condition接口方法和Object类方法比较
- Condition可以和任意的Lock组合,实现管理线程的阻塞队列(直接在内存重操作)。
- 一个线程的案例中,可以使用多个Lock锁,每个Lock锁上可以结合Condition对象。
- synchronized同步中做不到将线程划分到不同的队列中(需要本地方法[c++编写]和操作系统交互)。
- Object类wait()和notify()都要和操作系统交互,并通知CPU挂起线程,唤醒线程,效率低。
- Condition接口方法await()不和操作系统交互,而是让释放锁,并存放到线程队列容器中(内存总),当被signal()唤醒后,从队列中出来,从新获取锁后在执行。
- 因此使用Lock和Condition的效率比Object要快很多。
生产者与消费者案例改进
包子铺类
public class BaoZiPu {
private int baoZiCount;
//标志位变量
//当包子没有时(包子状态为false),吃货线程等待。
//包子铺线程生产包子(即包子状态为true),并通知吃货线程(解除吃货的等待状态)。
private boolean flag;
//创建Lock接口实现类,线程安全提供锁定
private Lock lock = new ReentrantLock();
//Condition对象和生产者锁结合
private Condition productCondition = lock.newCondition();
//Condition对象和消费者锁结合
private Condition customerCondition = lock.newCondition();
public void setFlag(boolean flag){
this.flag = flag;
}
public boolean getFlag(){
return flag;
}
//消费者调用方法,消费者Lock对象锁定
public void get(){
lock.lock();
//判断标志位 ==false,没有生产,线程等待
while (flag == false)
try {
customerCondition.await();
}catch (InterruptedException ex){}
System.out.println("消费第"+baoZiCount+"个包子");
//修改标志位
flag = false;
//唤醒对方线程
productCondition.signal();
lock.unlock();
}
//生产者调用方法,变量++,生产者Lock对象锁定
public void set(){
lock.lock();
//判断标志位,==true,没有消费,线程等待
while(flag == true)
try {
productCondition.await();
}catch (InterruptedException ex){}
baoZiCount++;
System.out.println("生产第"+baoZiCount+"个包子");
//修改标志位
flag = true;
//唤醒对方线程
customerCondition.signal();
lock.unlock();
}
}
第三章:线程池
3.1-概述(了解)
线程池思想
我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
在Java中可以通过线程池来达到这样的效果。
什么是线程池
其实就是一个容纳多个线程的容器,其中的线程可以反复使用,省去了频繁创建线程对象的操作,无需反复创建线程而消耗过多资源。
合理使用线程池的好处
- 降低资源消耗。减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
3.2-使用线程池(重点)
java.util.concurrent
包中定义了线程池相关的类和接口。
Java里面线程池的顶级接口是 java.util.concurrent.Executor
,但是严格意义上讲 Executor 并不是一个线程 池,而只是一个执行线程的工具。真正的线程池接口是 java.util.concurrent.ExecutorService
。
要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在 java.util.concurrent.Executors 线程工厂类里面提供了一些静态工厂,生成一些常用的线程池。官方建议使用Executors工程类来创建线程池对象。
Executors类
创建线程池对象的工厂方法,使用此类可以创建线程池对象。
ExecutorService接口
线程池对象的管理接口,提交线程任务,关闭线程池等功能。
Callable接口
线程执行的任务接口,类似于Runnable接口。
- 接口方法
public V call()throw Exception
- 线程要执行的任务方法
- 比起run()方法,call()方法具有返回值,可以获取到线程执行的结果。
Future接口
异步计算结果,就是线程执行完成后的结果。
- 接口方法
public V get()
获取线程执行的结果,就是获取call()方法返回值。
示例代码
需求:创建有2个线程的线程池,分别提交线程执行的任务,一个线程执行字符串切割,一个执行1+100的和。
实现Callable接口,字符串切割功能:
public class MyStringCallable implements Callable<String[]> {
private String str;
public MyStringCallable(String str ){
this.str = str;
}
@Override
public String[] call() throws Exception {
return str.split(" +");
}
}
实现Callable接口,1+100求和:
public class MySumCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for(int x = 1 ; x<= 100; x++){
sum+=x;
}
return sum;
}
}
测试类:
public static void main(String[] args) throws Exception {
//创建有2个线程的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
//提交执行字符串切割任务
Future<String[]> futureString = executorService.submit(new MyStringCallable("aa bbb cc d e"));
System.out.println(Arrays.toString(futureString.get()));
//提交执行求和任务
Future<Integer> futureSum = executorService.submit(new MySumCallable());
System.out.println(futureSum.get());
executorService.shutdown();
}
第四章:Timer定时器
4.1-概述(了解)
Java中的定时器,可以根据指定的时间来运行程序。
java.util.Timer
一种工具,线程用其安排以后在后台线程中执行的任务。可安排任务执行一次,或者定期重复执行。定时器是使用新建的线程来执行,这样即使主线程main结束了,定时器也依然会继续工作。
4.2-Timer定时器的使用
常用方法
- 构造方法:无参数。
- 定时方法:public void schedule(TimerTask task,Date firstTime,long period)
- TimerTask是定时器要执行的任务,一个抽象类,我们需要继承并重写方法run()
- firstTime定时器开始执行的时间
- period时间间隔,毫秒值
示例
public class Test{
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
int i = 0;
@Override
public void run() {
i++;
System.out.println(i);
}
},new Date(),1000);
}
}
第五章:ConcurrentHashMap
5.1-概述(了解)
java.util.concurrent.ConcurrentHashMap
支持获取的完全并发和更新的所期望可调整并发的哈希表。
此集合实现Map接口,因此Map集合中的所有功能都可以直接使用。
-
ConcurrentHashMap集合特点
- 底层是哈希表结构
- 此集合是线程安全的,但是某些功能不必锁定。比如get()
- 不会抛出ConcurrentModificationException并发修改异常
- 此集合支持遍历过程中添加,删除元素。
-
ConcurrentHashMap集合的锁定特点
- 为了提高效率,不会将整个集合全部锁定。
- 当添加或者移除元素时,是对链表进行操作,链表存储在数组中,那么就只会针对这个链表进行锁定。
5.2-迭代中添加元素(测试)
public static void main(String[] args) throws Exception {
Map<String,String> map = new ConcurrentHashMap<String, String>();
map.put("1","a");
map.put("2","b");
map.put("3","c");
System.out.println(map);
Set<Map.Entry<String,String>> set = map.entrySet();
Iterator<Map.Entry<String,String>> it = set.iterator();
while (it.hasNext()){
map.put("4","4");
Map.Entry<String, String> next = it.next();
System.out.println(next.getKey()+"="+next.getValue());
}
}
5.3-线程安全测试
public static void main(String[] args) throws Exception {
Map<String,Integer> map = new ConcurrentHashMap<String, Integer>();
Map<String,Integer> map = new HashMap<String, Integer>();
//存储2000个键值对
for(int x = 0 ; x < 2000; x++){
map.put("count"+x,x);
}
//开启线程,删除前500个
Runnable r1 = new Runnable() {
@Override
public void run() {
for(int i = 0 ; i < 500;i++){
map.remove("count"+i);
}
}
};
//开启线程,删除1000-1500个
Runnable r2 = new Runnable() {
@Override
public void run() {
for(int i = 1000 ; i < 1500;i++){
map.remove("count"+i);
}
}
};
new Thread(r1).start();
new Thread(r2).start();
//等待2秒,让2个线程全部运行完毕
Thread.sleep(2000);
//打印集合长度,线程安全集合应该是1000
System.out.println(map.size());
}