Lock和synchronized的区别和使用
https://blog.csdn.net/ydk888888/article/details/78625333
ThreadLocal
线程本地变量,get()、set()、remove()
get()是先拿到当前线程的ThreadLocalMap对象,然后再在map找当前threadlocal对应的值。
map中的entry继承了WeakReferance使用threadlocal做键值,
弱引用,当没有强引用指向threadlocal时,下一次回收时会直接被回收。使用弱引用和软引用来避免OOM
https://www.cnblogs.com/dolphin0520/p/3784171.html
import java.lang.ref.WeakReference;
public class Main {
public static void main(String[] args) {
WeakReference<String> sr = new WeakReference<String>(new String("hello"));
System.out.println(sr.get());
System.gc(); //通知JVM的gc进行垃圾回收
System.out.println(sr.get());
}
}
阻塞同步:互斥同步最主要的问题就是进行线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步(Blocking Synchronization)
非阻塞同步:
通过控制变量值的版本来保证CAS的正确性。但是大部分情况下ABA问题不会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更高效。
同步和异步
同步和异步通常用来形容一次方法调用。同步方法调用需要等待方法返回后才能继续后续的行为。调用异步方法可以另起一个线程去执行它,当前线程继续执行。如果这个异步调用需要返回结果,它执行完后会通知调用者。并发和并行
并发是多个任务在同一个时间段交替执行,宏观上是同时执行。
并行是多个任务在同一个时间段同时执行,是真正意义上的同时执行。
一般多核或多cpu能多线程并行执行。单核单cpu只能并发。临界区
临界区用来表示一种公共资源,每次只能由一个线程使用,其他线程必须等待这个线程用完才能访问它。阻塞和非阻塞
通常用来形容多线程间的相互影响。
占用临界区资源的线程会阻塞其他请求该资源的线程,IO。
非阻塞强调线程不会妨碍其他线程执行,NIO。-
死锁、饥饿和活锁
多线程活跃性问题。产生这三种情况之一说明该线程很难继续往下执行。- 死锁:线程A占用资源1请求资源2,线程B占用2请求1。AB互不相让死磕到底。
(1)互斥条件:进程对所分配到的资源不允许其他进程进行访问,若其他进程访问该资源,只能等待,直至占有该资源的进程使用完成后释放该资源
(2)请求和保持条件:进程获得一定的资源之后,又对其他资源发出请求,但是该资源可能被其他进程占有,此事请求阻塞,但又对自己获得的资源保持不放
(3)不可剥夺条件:是指进程已获得的资源,在未完成使用之前,不可被剥夺,只能在使用完后自己释放
(4)环路等待条件:是指进程发生死锁后,必然存在一个进程--资源之间的环形链 - 活锁:AB都太谦让,主动将资源释放给别人导致自己一直无法拿到所有的资源。A放了1;B放了2 --> A拿到2;B拿到1 -->A放2;B放1 -->...
- 饥饿:指一个线程因优先级低等原因一直无法请求到所需资源。资源一直在别的线程那就是不到线程A手里。
- 死锁:线程A占用资源1请求资源2,线程B占用2请求1。AB互不相让死磕到底。
-
并发级别
阻塞、无饥饿、无障碍、无锁、无等待- 阻塞:sychoronized关键字、重入锁
- 无饥饿:公平锁
- 无障碍:乐观的策略,检查到有冲突时就回滚。
- 无锁:保证必然有一个线程在有限步内完成操作离开临界区,CAS
- 无等待:要求所有线程在有限步内完成操作。RCU(Read-Copy-Update),对数据的读不加控制,写时先取得原始数据的副本,接着只修改副本数据,修改完后在合适的时机会写数据。(主内存工作内存)
-
JMM java内存模型
Java内存模型
《深入理解JAVA虚拟机》
再有人问你volatile是什么,就把这篇文章发给他
单例模式singleton为什么要加volatile为了保证多个线程间可以有效地、正确地协同工作。
主内存,工作内存,八个内存交互操作,操作规则,内存屏障。- 所有变量都存储在主内存中,每条线程还有自己的工作内存。
- 线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存中的变量。
- 不同线程之间无法直接访问对方工作内存中的变量。
lock-unlock、store-write、read-load、use、assign
- lock:作用与住内存的变量,把变量标识为一条线程独占状态
- unlock:释放处于锁定状态的变量,释放后的变量才可以被其他线程锁定。
- read:把主内存变量传输到工作内存
- load:把read的到的值放入工作内存的变量副本中
- store:把工作内存的变量传送到主内存
- write:把store来的值放到主内存中。
- use:把工作内存的变量传给执行引擎
-
三个特性
- 原子性:一个操作不可中断。32位系统对long的读写不是原子性的。
- 可见性:变量被修改后,其他线程能都立即知道这个修改。
缓存或硬件优化、指令重排、编辑器的优化.
在一个线程中去观察另一个线程的变量,它们的值是否能观测到、何时能观测到是没有保证的。 - 有序性:指令重排
指令重排是为了性能考虑,减少流水线中断。 - Happen-Before规则
这些规则下指令不能重排:- 程序顺序原则:一个线程内保持语义的串行性
- volatile原则:volatile变量的写先发生于读
- 锁规则:unlock必然发生在随后的lock前
- 传递性:
- 线程的start()先于它的每一个动作
- 线程的所有操作先于线程的终结(Thread.join())
- 线程的中断(interrupt())先于被中断线程的代码
- 对象的结束先于finalize()方法。
线程和进程
进程:运行时的程序,系统分配资源和调度的基本单位。
线程:轻量级进程,是程序的最小执行单位。进程可容纳多个线程。
使用多线程而不是多进程去设计并发程序,是因为线程间的切换和调度的成本远远小于进程。-
线程的生命周期
线程的生命周期- 新建(New Thread)
当创建一个Thread对象时,线程进入新建状态(未被启动)。
Thread t1 = new Thread(); - 就绪(Runnable)
线程已经被启动,所有资源都已经准备好了,等待分配CPU时间片。
t1.start(); - 运行(Running)
就绪线程获得CPU资源,正在执行任务。
run() - 阻塞(Blocked)
正在运行的线程因某种原因让出cpu,暂停执行。
阻塞-->就绪-->运行
被阻塞的线程解除阻塞后,必须等待线程调度器再次调度它- sleep(long mills) 让出cpu资源
- 线程调用一个阻塞式IO方法,在该方法返回之前,该线程被阻塞
- 线程试图获得一个同步监视器(锁),但该锁正被其他线程所持有
- wait() 释放所有资源
- suspend() 将线程挂起(容易导致死锁,已被废弃)
- 死亡(Dead)
- run()执行完,线程正常结束
- 线程抛出一个未捕获的Exception或Error
- stop()结束线程, 容易死锁
- 新建(New Thread)
-
Thread状态
- NEW:才new出来
- RUNNABLE: 正在执行
- BLOCKED: 没拿到锁
- WAITING: 无时限的等待wait join
- TIMED_WAITING:有时限的等待sleep
- TERMINATED:线程执行完
-
终止线程
有时线程执行体是个无穷循环,需要手动终止。stop()
Thread的final实例方法,废弃。
直接终止线程,释放锁。容易引起数据不一致的问题。
比如数据写到一半的时候被强行终止了,其他的线程拿到了锁读到了这个被写坏的数据。(数据库事务回滚引发的脏读。。。不对不一样线程的是固定得了,数据库的只是特定时间事务读到脏数据)自行决定何时退出线程
设置一个volatile变量,在执行体的循环合适的位置中查看这个值,确定是true的话break退出循环。
所以想要在什么时候退出只需要修改这个变量的值就可以了。-
线程中断 (JDK提供的支持)
发送中断通知给线程,线程收到后自行决定如何处理。- interrupt():设置中断标志位
- isInterrupted():通过检查中断标志位,判断是否被中断
- interrupted():判断中断状态,并清除中断标志位状态
都是Thread的实例方法。
中断比设一个变量更强,因为若线程wait或sleep了就只能通过中断来终结这个线程了。
sleep时被中断会抛出InterruptedException异常并清除中断状态,所以需要在捕获这个异常的时候再设置一次中断状态位。
public void run(){
while(true){
if(Thread.currentThread().isInterrupted()){
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//设置中断状态
Thread.currentThread().interrupt();
}
Thread.yield();
}
}
wait()和notify()
java根类Object的方法suspend()和resume()
suspend和resume是Thread的实例方法
suspend():挂起,不释放锁资源。废弃
resume():唤醒,若是不小心在suspend()前先执行掉了,挂起线程就很难有机会继续执行了。
suspend的线程状态显示的是runnable,这样很难找出错误的地方。
可用Object的wait和notify代替实现suspend功能。-
yield()和join()
join():实例方法,阻塞当前线程等待目标线程完成。
原理 thread.join() --> thread.wait(),thread线程退出前会执行notifyAll()。
因为join是用的wait实现的,所以要尽量避免用线程实例调用wait,notify,影响系统api或者被系统api所影响。yield():Thread的native静态方法,让出当前线程的cpu,让出后还会进行争夺。(应该是不会释放其他资源的吧)
volatile
修饰易变的、不稳定的变量。让其他线程能够看到变量的改动。
写发生在读之前。
禁止重排序,变量位置不变。
Client模式,没优化
Server模式,有优化线程组
如果线程数量很多且线程分工明确,可以将线程放置在同一个线程足内。
tg = new ThreadGroup("PrintGroup")
new Thread(tg, runnable, name);
activeCount() 获得组内活跃的线程数量,一个估计值
list() 打印所有线程信息
守护线程(Daemon)
后台线程,如垃圾回收线程、JIT线程等。如果一个java进程只剩下守护线程的话,jvm就会自然退出。
thread.setDaemon(true) 将thread设为守护线程。用户进程执行完后,就算它还在执行也会退出。必须在start前设置,否则会抛异常并当作用户线程。线程优先级
线程的优先级调度与底层操作系统有关,所以在各个平台上表现不一,产生的后果也可能不容易预测,无法精确控制。
/**
* The minimum priority that a thread can have.
*/
public final static int MIN_PRIORITY = 1;
/**
* The default priority that is assigned to a thread.
*/
public final static int NORM_PRIORITY = 5;
/**
* The maximum priority that a thread can have.
*/
public final static int MAX_PRIORITY = 10;
java的线程优先级,数字越大,优先级越高。范围1到10
thread.setPriority();
高优先级的线程大部分情况下都会首先完成任务。
-
synchronized
- 同步块指定加锁对象
- 直接作用于实例方法,对当前实例加锁
- 直接作用于静态方法,对当前类加锁
可重入
public class Test implements Runnable{
static int i=0;
public static void main(String[] args){
Test t = new Test();
Thread t1 = new Thread(t,"t1");
Thread t2 = new Thread(t,"t2");
//正确的案例,两个线程指向同一个Runnable实例,关注同一个对象锁
// Thread t1 = new Thread(new Test(),"t1");
// Thread t2 = new Thread(new Test(),"t2");错误案例,两个线程关注不同的对象锁
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
}
private synchronized void increace(){
if(i<100000){
i++;
}
}
public void run(){
while(i<100000){
increace();
}
}
}
-
错误的加锁
给引用会变的对象加锁导致锁失效,如包装类和String等。
Integer在-128~127是同一个对象但是其他情况不一定。
如果将Integer i = 129;对象当作锁,
i++;后i指向另一个对象 Integer.valueOf(i.intValue()+1);
...............
synchronized关键字经过编译后,会在同步块的前后分别形成moniterenter和mointerexit两个字节码指令,这两个指令都需要一个reference引用类型的参数来指明要锁定和解锁的对象。
在执行moniterenter指令时,首先要尝试获取对象的锁,若这个对象没被锁定或当前线程已经拥有这个对象的锁,把锁的计数器加1。相应的,在执行mointerexit是,会将计数器减1,当计数器为0时,锁被释放。若获取对象锁失败,则当前线程就要阻塞等待,直到对象锁被另外一个线程释放。
对于线程的阻塞和唤醒,需要用户态和核心态切换,状态切换需要很多处理器时间。
JDK并发包
Lock
lock中定义了4中方法来获取锁。
lock必须手动释放锁,所以需要在trycatch中使用并在finally中释放锁避免死锁。
void lock();
阻塞等待锁
void lockInterruptibly() throws InterruptedException;
在等待锁的过程中可响应中断
boolean tryLock();
尝试获取锁,会立即返回
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
有等待时间的获取锁
void unlock();
释放锁
Condition newCondition();
-
重入锁
jdk5重入锁性能远远大于synchronized,但jdk6.0开始,synchronize被做了大量的优化,使得两者性能差距并不大。
java.util.concurrent.locks.ReentrantLock
implements Lock, java.io.Serializable
必须手动指定合适加锁、何时释放锁,灵活性远远大于synchronized
lock.lock()
lock.unlock() 放finally里重入
同一个线程多次获得同一个锁,必须释放相同次数的锁。中断响应,取消对锁的请求
public void lockInterruptibly() throws InterruptedException
可解决死锁
lock.lockInterruptibly() 可以对中断进行响应的锁请求动作
注意:捕获到中断异常后还需要手动释放锁,毕竟响应中断的结果只是抛出InterruptedException而已。-
锁申请等待限时
public boolean tryLock(long timeout, TimeUnit unit) throws
InterruptedExceptiontimeout 等待时长;unit 计时单位;
if(lock.trylock(5,TimeUnit.SECONDS)){}TimeUnit.NANOSECONDS;
TimeUnit.MIRCOSECONDS;
TimeUnit.MILLISECONDS;
TimeUnit.SECONDS;
TimeUnit.MINUTES;
TimeUnit.HOURS;
TimeUnit.DAYS; 公平锁
一个线程会倾向于再次获取已经持有的锁,这种分配方式高效但是不公平。
公平锁使先申请的线程先得到锁,不会产生饥饿现象。
public ReentrantLock(boolean fair)
实现公平锁需要维护一个有序队列,成本高性能低,因此默认是非公平的。原子状态(CAS操作);阻塞原语park()和unpark();等待队列;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public void lock() {//加锁
sync.lock();
}
public void unlock() {//释放锁
sync.release(1);
}
public void lockInterruptibly() throws InterruptedException {//响应中断
sync.acquireInterruptibly(1);
}
public boolean isHeldByCurrentThread() {//是否持有锁
return sync.isHeldExclusively();
}
public boolean tryLock() {//尝试获得锁,该方法不等待
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {//锁等待限时
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public ReentrantLock(boolean fair) {//公平锁,构造函数
sync = fair ? new FairSync() : new NonfairSync();
}
try{
lock1.lock();
lock2.lock();
} finally {
//判断该锁是否被当前线程所持有,同个线程申请多个不同的锁的情况下使用。
if(lock1.isHeldByCurrentThread()){
lock1.unlock();
}
if(lock2.isHeldByCurrentThread()){
lock2.unlock();
}
}
-
Condition条件,和重入锁配合
通过Lock 的Condition newCondition()方法生成一个与当前重入锁绑定的Condition
利用Condition对象实现wait notify功能。- await():与wait()相似,使当前线程等待,释放锁,等待其他线程notity它。可响应线程中断。
- awaitUninterruptibly():不响应中断
- singal():唤醒一个等待的线程
- singalAll():唤醒所有等待的线程
注意:等待唤醒都要先拿到锁 lock.lock()
ArrayBlockingQueue的内部实现
/** The queued items */
final Object[] items;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;//take -- 队列为空,让线程等待。
/** Condition for waiting puts */
private final Condition notFull;//put -- 队列已满,让线程等待
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); //队列已满,让线程等待。
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); //唤醒一个需要take的线程
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); //队列为空,让线程等待。
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //唤醒一个需要put的线程
return x;
}
-
信号量(Semaphore)
允许多个线程同时访问,是对锁的扩展。
public Semaphore(int permits) //指定能同时申请几个许可
public Semaphore(int permits, boolean fair) //指定是否公平锁
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public void acquire() throws InterruptedException {//尝试获得许可,若无法获得,则线程会等待,知道有线程释放一个许可或者当前线程被中断。
sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {//不响应中断
sync.acquireShared(1);
}
public boolean tryAcquire() {//尝试获得一个许可,成功返回true,失败返回false
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {//
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void release() {//释放一个许可
sync.releaseShared(1);
}
public void acquire(int permits) throws InterruptedException {//尝试获得多个许可
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
-
ReadWriteLock读写锁
JDK5的读写分离锁,允许多个线程同时读,但是写写操作和读写操作依然需要锁。public ReentrantReadWriteLock()
public ReentrantReadWriteLock.WriteLock writeLock()
public ReentrantReadWriteLock.ReadLock readLock()ReadWriteLock适用于读多写少
- 读读不互斥
- 读写互斥,读阻塞写,写阻塞读
- 写写互斥,写阻塞写
api文档里的例子
class RWDictionary {
private final Map m = new TreeMap();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}}
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}}
-
倒计时器:CountDownLatch
让某一个线程等待直到倒计时结束
public CountDownLatch(int count)
public void countDown()
public void await() throws InterruptedException
public CountDownLatch(int count) {//设定计数个数
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void countDown() {//每执行一次计数减一
sync.releaseShared(1);
}
public void await() throws InterruptedException {//当前线程等待计数完
sync.acquireSharedInterruptibly(1);
}
-
循环栅栏:CyclicBarrier
可以反复使用的计数器
public CyclicBarrier(int parties, Runnable barrierAction)
public int await() throws InterruptedException, BrokenBarrierException
/**
*
* @param parties 计数总数即参与的线程总数
* @param barrierAction 当一次计数完成即执行完parties个线程时,由最后一个线程执行这个动作。
* @throws IllegalArgumentException 计数总数小于1时抛出
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);//计数减一,挂起线程;若计数为0则唤醒所有线程并重置计数器
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;//内部用重入锁
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;//一个线程执行一次计数减一
if (index == 0) { // 如果计数为0了
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); //执行之前指定的动作BarrierAction
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier(); //重置计数器,唤醒所有等待的线程
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {//无限循环以防恶意唤醒时执行不了该做的动作 等
try {
if (!timed)
trip.await(); //让当前线程等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
api文档例子
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(...); }};
barrier = new CyclicBarrier(N, barrierAction);
List threads = new ArrayList(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// wait until done
for (Thread thread : threads)
thread.join();
}
}}
-
线程阻塞工具类:LockSupport
可以在线程内任意位置让线程阻塞
和Thread的supsend比,弥补了resume发生在前导致永久挂起的情况,线程状态是waiting
和Object的wait比,它不用获得锁,不会抛出中断异常
内部由Unsafe实现public static void park():挂起线程,不释放资源
public static void unpark(Thread thread)之所以不会永久挂起且不用锁,是因为它给每个线程准备了一个许可。
- 如果许可可用,park会返回并将许可设为不可用;
- 如果许可不可用,park不会返回;
- unpark将许可设为可用。
park可响应中断并立即返回。
-
线程复用:线程池
大量线程的创建会耗尽CPU和内存资源,线程池可以复用创建的线程
ThreadPoolExecutor 线程池
- newFixedThreadPool()方法,该方法返回一个固定线程数量的线程池。
- newSingleThreadExecutor()方法,返回一个只有一个线程的线程池。
- newCachedThreadPool,线程数量不确定,0~MAX,有空闲线程可以复用,会优先使用可复用的线程。若所有线程都在工作,又有新的任务提交,则会创建新的线程。
ExecutorService es = Executors.newFixedThreadPool(5);
es.submit();
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
/**核心线程数量**/
int maximumPoolSize,
/**池中最大线程数量**/
long keepAliveTime,
/**当池中数量超过核心线程数时,多余的空闲线程的存活时间**/
TimeUnit unit,
/**时间单位 **/
BlockingQueue<Runnable> workQueue,
/**任务队列,存放被提交但还未执行的任务 **/
ThreadFactory threadFactory,
/**用于创建线程的线程工厂 **/
RejectedExecutionHandler handler) {
//线程池已满、没有空闲线程且任务队列也满了时,对提交的任务的拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
BlockingQueue<Runnable> workQueue
new SynchronousQueue<Runnable> () 直接提交的队列
提交的任务不会被保存,直接交给线程执行;
如果没有空闲线程则创建新的线程;
如果线程数量达上限则执行拒绝策略。ArrayBlockingQueue<Runnable> (int) 有界的队列
有界队列仅在任务队列满时,才可能将线程数提升到corePoolSize以上。除非系统非常繁忙,否则确保核心线程数维持在corePoolSize。
若线程池数量小于核心线程数量,优先创建新的线程;(空闲线程)
若大于,则加入任务队列;
若队列满了,则创建线程;
若线程池满了,执行拒绝策略。
- LinkedBlockingQueue 无界任务队列
除非系统资源耗尽,否则不存在任务入队失败的情况。线程数不会大于corePoolSize,若任务创建和处理的速度差异很大,无界队列会保持快速增长,直至内存耗尽
- PriorityBlockingQueue 优先队列
特殊的无界队列,根据任务自身的优先级顺序先后执行。
总体来说都是以运行线程数小于核心线程数,分配线程-->已达核心线程数,进队列-->队列已满,分配线程-->已达最大线程数,执行拒绝策略的顺序来处理的。
有界队列就是这个顺序;
而直接提交队列,相当于没有队列,所以是分配线程(分配空闲线程或创建新线程)-->已达最大线程数,执行拒绝策略的顺序;
无界队列则相当于没有最大线程数和拒绝策略,所以是小于核心线程数,分配线程-->已达核心线程数,进队列的顺序。
线程池的核心调度代码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果正在运行的线程数量小于核心线程数量,分配线程执行该任务(有空闲线程分配空闲线程,没有的话创建新线程);否则提交到等待队列
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 进入队列成功,等待执行;进入队列失败(队列已满或是SynchronousQueue),则将任务提交给线程池
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 如果线程数目小于maximumPoolSize,分配线程执行;否则执行拒绝策略
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//workerCountOf 正在运行的线程数量
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
RejectedExecutionHandler handler
- AbortPolicy 抛异常 默认的拒绝策略
- CallerRunsPolicy 只要线程池未关闭,直接在任务提交线程运行该任务,这样该线程性能会急剧下降
- DiscardOledestPolicy 丢弃队尾任务(最老的一个任务)
- DiscardPolicy 丢掉这个任务
- 可自己扩展RejectedExexutionHandler接口实现rejectExecution(Runnable r, ThreadPoolExecutor excutor)处理这个任务
ThreadFactory threadFactory
ThreadFactory是一个接口只有一个方法用来创建线程,线程池用这个方法新建线程。
Thread newThread(Runnable r);
扩展线程池
beforExecute()
afterExecut()
terminated()
关闭线程池
shutdown() 不再接受新任务,等待所有任务执行完毕后再关线程池
优化线程池线程数量
Runtime.getRuntime.availableProcessors() 获得系统可用cpu数量
Nthread = NcpuUcpu(1+W/C)** 大概是可用的cpu数加等待的cpu数?
Ncpu:cpu数量
Ucpu:目标cpu的使用率
W/C:等待时间和计算时间的比率
线程池sumbit很可能吃掉异常,改用execute
Executors 线程池工厂
//固定线程数+无界队列
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//单个线程+无界队列
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//无限线程+直接提交(无等待队列)+空闲线程60s死
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Fork/Join框架
fork()后多一个线程分支;join()阻塞当前线程等待目标线程完成。
ForkJoinPool线程池,将fork方法将给它进行处理。
每个线程拥有一个线程队列
互帮互助,帮其他线程从队列尾部拿任务来处理,自己从自己队头拿任务处理。
分而治之。
悲观锁和乐观锁
Java并发问题--乐观锁与悲观锁以及乐观锁的一种实现方式-CAS
悲观锁,悲观的认为数据一定会被并发访问,所以直接给数据加锁,更新时必须先获得锁。
乐观锁,乐观的认为数据不会被并发修改,所以不会加锁,但是访问数据时需要通过检测是否被修改了,如果修改了就返回错误让用户决定该怎么办。一种实现是CAS。Java中java.util.concurrent.atomic包下面的原子变量类。CAS
concurrent包完全建立在cas上callable 的call方法有返回值,并且可以抛异常,且返回值可以被future接收。
双重检查加锁的单例模式要加volatile是因为有可能这一句
instance = new Singleton();
被重排序了,再还没new 完的时候就给instance赋值了,那这样另一个线程判断到有值就直接执行返回语句return instance;
会有对象尚未初始化错误。Callable与Runable接口 submit与execute区别
Callable有返回值,并且它的call方法只能通过ExecutorService的sumbit()执行,返回一个Future对象,通过调用它的get方法可获得线程的执行结果即call方法返回的结果若线程没有完成,get方法会阻塞在那,可以通过get获得线程抛出的异常。
Runnable没有返回值,就算通过sumbit方法获得了future,调用get返回的回是null。
callable好处:
可了解任务执行情况,可取消任务的执行?,可获得任务执行的结果,可抛异常。
FutureTask是Future的实现类。FutureTask<Integer> result = new FutureTask<>(callable实现类的对象);
new Thread(result).start();
另外,runnable可用lambda表达式,
(方法参数)->{功能逻辑(方法内容)}
new Thread( ()->{...do something...} ).start();
创建线程的4个方法
- A extends Thread() overload run() -->new A().start();
- new Thread(new Runable(){overload run()}).start();
- new Thread(new Future( new Callable){overload call()} ).start();
- ExecutorService es = Executors.newFixedThreadPool();es.submit(new Callable or Runable);