多线程
进程 操作系统管理最小单元
线程 CPU调度最小单元
-
如何停止线程
使用violate boolean变量来标识线程是否停止
停止线程时,需要调用停止线程的interrupt()方法,因为线程有可能在wait()或sleep(), 提高停止线程的即时性
对于blocking IO的处理,尽量使用InterruptibleChannel来代替blocking IO
sleep()方法由于中断而抛出异常,此时,它会清除中断标记,如果不加以处理,那么下一次循环开始时,就无法捕获这个中断,故在异常处理中,再次设置中断标记位
while (!Thread.currentThread().isInterrupted()) {
// ... do stuff ...
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
- 类锁 对象锁 显示锁
- 内置锁 synchronized 锁对象class和obj 分别对应类锁和对象锁
- 对象锁的粒度要比类锁的粒度要细,引起线程竞争锁的情况比类锁要少的多,所以尽量别用类锁,锁的粒度越少越好。
- 显示锁 ReentrantLock
- 内置锁 synchronized 锁对象class和obj 分别对应类锁和对象锁
内置锁和读写锁效率对比
//---------------内置锁------------------------
public synchronized String getStr(){
return "x";
}
public synchronized void setStr(String str){
//"x"
}
//----------------读写锁----------------------------
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock getLock = lock.readLock();//读锁
private final Lock setLock = lock.readLock();//写锁
public String getStr(){
getLock.lock();
try{
return "x";
}finally{
getLock.unlock();
}
}
public void setStr(String str){
setLock.lock();
try{
//"x"
}finally{
setLock.unlock();
}
}
//读写锁比互斥锁允许对于共享数据更大程度的并发。每次只能有一个写线程,但是同时可以有多个线程并发地读数据。ReadWriteLock适用于读多写少的并发情况。
-
生产者消费者模式
第一版 使用synchronized 保证生产者 生产完后消费者才获取到
导致问题: 生产者不断生产 生产完后消费者才能消费 需要实现: 生产者生产一个消费一个
public synchronized void put(String name){
id += 1;
//生产者生产一个完成
}
public synchronized void out(){
id -=1;
//消费者得到
}
第二版 增加标记
wait()、notify()和notifyAll()方法为什么要在synchronized代码块中?
在Object的wait()方法上面有这样一行注释:The current thread must own this object's monitor,意思是调用实例对象的wait()方法时,该线程必须拥有当前对象的monitor对象锁,而要拥有monitor对象锁就需要在synchronized修饰的方法或代码块中竞争并生成占用monitor对象锁。而不使用synchronized修饰的方法或代码块就不会占有monitor对象锁,所以在synchronized代码块之外调用会出现错误,错误提示为:
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at it.cast.basic.thread.SynchronizedTest.main(SynchronizedTest.java:27)
private boolean flag;
public synchronized void put(String name){
if(!flag){
id += 1;
//生产者生产一个完成
flag = true;
notifyAll();//唤醒wait()冻结的线程 如果没有就是空唤醒
wait();//当前线程冻结 释放CPU执行权去执行其它线程
}
}
public synchronized void out(){
if(flag){
id -=1;
//消费者得到
flag = false;
notifyAll();//唤醒wait()冻结的线程 如果没有就是空唤醒
wait();//当前线程冻结 释放CPU执行权去执行其它线程
}
}
管程法 生产者消费者和优化
private ArrayList<Integer> array= new ArrayList<>();
private Object creatorLocker = new Object();
public void put(int num){
synchronized (creatorLocker) {//必须先获得生产者锁才能生产
synchronized (array) {
while(array.size()>=5){
try{
array.wait();
}
}
array.add(num);
array.notify();
}
}
}
private Object consumerLocker = new Object();
public void get(){
synchronized (consumerLocker) {
synchronized (array) {
while(array.size()<1){
try{
array.wait();
}
}
int a = array.get(0);
array.notify();
}
return a;
}
}
Lock 生产者和消费者
private int number =0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void put(){
lock.lock();
try{
while(number >= 5){
condition.await();
}
number++;
condition.signalAll();
}finally{
lock.unlock();
}
}
public void get(){
lock.lock();
try{
while(number <= 0){
condition.await();
}
number--;
condition.signalAll();
}finally{
lock.unlock();
}
}
- ThreadLocal
- CAS
锁机制存在以下问题:
(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。
(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。
(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。
volatile是不错的机制,但是volatile不能保证原子性。因此对于同步最终还是要回到锁机制上来。
独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。乐观锁用到的机制就是CAS,Compare and Swap。
- 线程池
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数 核心线程和队列线程满了创建空闲线程
long keepAliveTime,//核心线程除外的空闲线程存活时间
TimeUnit unit,
BlockingQueue<Runnable> workQueue,//有界阻塞队列 新任务放入等待调度
ThreadFactory threadFactory,//创建新线程的工厂类
RejectedExecutionHandler handler)//拒绝策略 达到最大线程数限制
工作队列
- ArrayBlockingQueue 基于数组有界阻塞队列 新任务加入队尾 队列满则创建新线程到maximumPoolSizes数量
- LinkedBlockingQueue 最大容量为Interger.MAX 新任务一直存在该队列 不会创建新线程到maximumPoolSizes数量 相当于maximumPoolSize参数不起作用
- SynchronousQueue 不缓存阻塞队列 生产者放入一个任务必须等到消费者取出这个任务 没有线程则创建线程到maximumPoolSizes数量
- PriorityBlockingQueue 优先级队列 优先级通过Comparator实现
拒绝策略 工作队列任务达到最大限制 超过maximumPoolSizes数量 则执行拒绝策略
- CallerRunsPolicy 直接执行被拒绝任务run方法 除非线程池已经shutdown 则直接抛弃任务
- AbortPolicy 直接丢弃任务 并抛出RejectedExecutionExecption异常
- DiscardPolicy 直接丢弃任务 什么都不做
- DiscardOldestPolicy 抛弃进入最早的任务 尝试将这次拒绝的任务放入队列
线程池提交任务执行顺序
- 线程数<核心线程数 创建线程(线程为核心线程)
- 线程数>核心线程数 队列未满 将任务放入队列中
- 线程数>核心线程数 队列已满
- 线程数<最大线程数 创建线程
- 线程数>最大线程数 启用拒绝策略
- 一个线程执行完 从队列中取出一个任务执行
- 线程执行最后空闲线程根据空闲时间停掉 最终收缩到核心线程数
-
BlockingQueue阻塞队列
- ArrayBlockingQueue
方式 抛出异常 有返回值不抛出异常 阻塞等待 超时等待 添加 add() offer() put() offer 移除 remove() poll() take() poll() 检测队首元素 element() peek() - - -
SynchronousQueue同步队列
- 没有容量 put进入一个元素 必须等待 take取出来 才能进行下一个put元素操
-
线程池的使用
降低资源的消耗
提高响应速度
-
方便管理
线程复用 可以控制最大并发数 管理线程
Executors.newSingleThreadExecutor();//单线程 Executors.newFixedThreadPool(5);//固定线程池大小 Executors.newCachedThreadPool();//可伸缩 遇强则强 遇弱则弱 //使用完调用shutdown关闭
线程池不允许使用Executors去创建, 而是通过ThreadPoolExecutor的方式
-
FixedThreadPool和SingleThreadPool
允许的请求队列长度为Integer.MAX_VALUE 可能会堆积大量的请求 从而导致OOM
-
CachedThreadPool和ScheduledTreadPool
允许的创建线程数量为Integer.MAX_VALUE, 可能会创建大量的线程, 从而导致OOM
- 自定义线程池 ThreadPoolExecutor
- 最大线程数如何定义
- CPU密集型 保持CPU效率最高
- Runtime.getRuntime().availableProcessors()
- IO密集型
- 设置数量为>程序中十分耗IO资源的线程数
- CPU密集型 保持CPU效率最高
- 最大线程数如何定义
-
函数式接口
- 一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口
//一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口 @FunctionalInterface interface GreetingService{ void sayMessage(String message); }
-
Stream流式计算
/** * 现在有5个用户 筛选 * 1 id必须偶数 * 2 年龄>23 * 3 用户名转为大写字母 * 4 用户名字字母倒排 * 5 只输出一个用户 * @param args */ @RequiresApi(api = Build.VERSION_CODES.N) public static void main(String[] args) { User a = new User(1, "a", 21); User b = new User(2, "b", 22); User c = new User(3, "c", 23); User d = new User(4, "d", 24); User e = new User(5, "e", 25); User f = new User(6, "f", 26); List<User> users = Arrays.asList(a, b, c, d, e,f); //计算交给Stream users.stream() .filter(user -> user.id % 2 == 0) .filter(user -> user.age > 23) .map(user -> user.name.toUpperCase())//返回name // .sorted(Comparator.reverseOrder())//name调用 .sorted((name1, name2) -> name2.compareTo(name1))//name调用 .limit(1) .forEach(System.out::println); // .forEach(s -> System.out.println(s)); }
- ForkJoin
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@RequiresApi(api = Build.VERSION_CODES.N)
public static void main(String[] args) {
forkTest();
//stream并行流
stream();
}
@RequiresApi(api = Build.VERSION_CODES.N)
private static void stream() {
long sum3 = LongStream.rangeClosed(0L, 10000000L).parallel().reduce(0, Long::sum);
}
private static void forkTest() {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinDemo demo = new ForkJoinDemo(0L, 100000000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(demo);
try {
Long sum = submit.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//或者
forkJoinPool.execute(demo);
try {
Long sum2 = demo.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//计算方法
@Override
protected Long compute() {
if ((end-start)>temp){//超过分支
long mid = (start + end) / 2;//中间值
ForkJoinDemo demo1 = new ForkJoinDemo(start, mid);
demo1.fork();//拆分任务 将任务加入线程队列
ForkJoinDemo demo2 = new ForkJoinDemo(mid + 1, end);
demo2.fork();
return demo1.join()+demo2.join();
}else {
long sum=0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
}
}
- Future异步调用
public static void main(String[] args) {
//发起一个请求 无返回值 runAsync
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("runAsync");
});
try {
completableFuture.get();//获取阻塞执行结果
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
//线程顺序执行
Thread thread1 = new Thread(()-> System.out.println(Thread.currentThread().getName()),"thread-1");
Thread thread2 = new Thread(()-> System.out.println(Thread.currentThread().getName()),"thread-2");
Thread thread3 = new Thread(()-> System.out.println(Thread.currentThread().getName()),"thread-3");
CompletableFuture.runAsync(thread1::start).thenRun(thread2::start).thenRun(thread3::start);
//有返回值 supplyAsync
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync");
return 1;
});
//异步回调
System.out.println(CompletableFuture.whenComplete((t, u) -> {
System.out.println("whenComplete" + " " + t + " " + u);
}).exceptionally((e) -> {
return 400;
}).get());
}
- JMM java内存模型
关于JMM的一些同步约定
1. 线程解锁前 必须将共享变量立即刷回主存
2. 线程加锁前 必须读取主存中的最新值到工作内存中
3. 加锁和解锁是同一把锁
java内存模型定义了8种操作来完成:
- lock(锁定):作用于主内存,它把一个变量标记为一条线程独占状态;
- unlock(解锁):作用于主内存,它将一个处于锁定状态的变量释放出来,释放后的变量才能够被其他线程锁定;
- read(读取):作用于主内存,它把变量值从主内存传送到线程的工作内存中,以便随后的load动作使用;
- load(载入):作用于工作内存,它把read操作的值放入工作内存中的变量副本中;
- use(使用):作用于工作内存,它把工作内存中的值传递给执行引擎,每当虚拟机遇到一个需要使用这个变量的指令时候,将会执行这个动作;
- assign(赋值):作用于工作内存,它把从执行引擎获取的值赋值给工作内存中的变量,每当虚拟机遇到一个给变量赋值的指令时候,执行该操作;
- store(存储):作用于工作内存,它把工作内存中的一个变量传送给主内存中,以备随后的write操作使用;
- write(写入):作用于主内存,它把store传送值放到主内存中的变量中.
volatile的理解
volatile是轻量级同步机制 保证可见性 不保证原子性 禁止指令重排
抛出问题: 子线程不知道主存的变化
使用volatile关键字会强制将修改的值立即写入主存,只要有一个线程将变量的值改了,马上就会同步到内存,其他的线程马上就可以得到这个改过后的值
- CAS
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);
//参数1期望值 达到了就更新为 参数2更新值
atomicInteger.compareAndSet(1,2);
System.out.println(atomicInteger.get());
//自旋锁
atomicInteger.getAndIncrement();
System.out.println(atomicInteger.get());
atomicInteger.compareAndSet(3,4);
System.out.println(atomicInteger.get());
//Unsafe类 通过这个类操作内存
}
缺点:
1. 循环耗时
2. 一次性只能保证一个共享变量的原子性
3. ABA问题
带版本号原子引用解决ABA问题
AtomicRefrence
AtomicStampedReference<Integer> reference = new AtomicStampedReference<>(初始值, 初始版本号);
-
各种锁的理解
公平锁 不插队
-
非公平 可以插队 (默认都是非公平)
public ReentrantLock(boolean fair){ sync = fair? new FairSync() : new NonfairSync(); }
可重入锁
不可重入锁
-
自旋锁
AtomicReference<Thread> atomicReference = new AtomicReference<>(); //加锁 public void myLock(){ Thread thread = Thread.currentThread(); //自旋锁 while (!atomicReference.compareAndSet(null,thread)){ } } public void myUnlock(){ Thread thread = Thread.currentThread(); atomicReference.compareAndSet(thread,null); }
-
死锁
jps查询进程信息 jstack查询堆栈信息排查死锁原因
java.lang.management 接口 ThreadMXBean
线程管理类接口