reentrantlock
实现了Lock接口
可以用于替换synchronized,比synchronized更灵活
必须手动释放锁
如果synchronized锁定遇到异常的话,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
如果一个线程等待时间过长,可以调用lockInterruptibly方法,可以对线程interrupt方法做出相应
reentrantlock可以指定为公平锁,默认为非公平锁
公平锁是谁等的时间长,谁获得锁,公平锁效率低
原因:在恢复一个被挂起线程与该线程真正开始运行之间存在着严重的延迟
如果线程A持有一个锁,线程B请求这个锁。由于这个锁已经被A持有,因此B被挂起。当A释放锁时,B被唤醒了,因此会再次尝试获取锁,与此同时如果C也请求这个锁,那么C很有可能在B唤醒之前获得,使用以及释放这个锁,这就是双赢的情况:B获得锁的时间没有被推迟,C更早地获取了锁,并且吞吐量也提高了
synchronized默认为非公平锁
接口lock,实现了比使用synchronized方法和语句可获得的更广泛的锁定操作
tryLock(),马上返回,拿到lock就返回true,不然返回false。 比较潇洒的做法。
带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false。比较聪明的做法。
lock(), 拿不到lock就不罢休,不然线程就一直block。 比较无赖的做法。
lockInterruptibly,调用后一直阻塞直到获得锁,但是接受中断信号
reentrantlock如何实现可见性
lock的可见性是其父类中的aqs AbstractQueuedSynchronizer的volatile修饰的state带来的。
reentrantlock和synchronized之间的选择
reentrantlock的危险性比同步机制要高,如果忘记在finally中调用unlock,那么很可能会有严重的后果。当内置锁不能满足需求的时候,才应该使用reentrantlock,这些高级功能包括:可定时的,可轮循的与可中断的锁获取,公平队列,以及非块结构所(分段锁)
读写锁 ReadWriteLock接口
public interface ReadWriteLock{
Lock readLock();
Lock writeLock();
} 允许多个读操作,或者一个写操作,这两个操作不能同时进行
读写锁面临的几个问题:
**释放优先**:写入锁释放锁时,读和写的进程同时存在,应该选择读还是写
- 如果这个锁由读线程持有,而另一个线程请求写入锁,那么其他读线程都不能获取读锁,直到写线程使用完,释放锁
**读线程插队**:如果锁由读线程持有,别的读线程是否可以插队,如果允许读插到写的前面,那么可能出现饿死的情况
**重入锁**:读和写锁是否可以重入
- 可以重入
**降级锁**:一个线程有写入锁,是否能不释放该锁的情况下获取读取锁
- 写线程降级为读线程可以,即先获取写锁,再获取读锁,这时候释放写锁,即降级,读不可以升级为写(会导致死锁)
**升级**:读取锁能否优于其他正在等待的读和写线程,升级为一个写入锁
读写锁使用32位的int类型来表示锁被占用的线程数(Reentrantlock的state),如果一个整型变量行维护多种状态,就需要“按位切割”这个变量,高16位读状态的线程占有数,低16位的写锁被同一个线程的申请次数
上图是一个划分图,表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。
读写锁是通过位运算迅速确定读和写各自的状态。假设当前同步状态值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。
根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。
为什么只有降级锁,没有升级锁?
只要线程获取写锁,那么这一刻只有这一个线程可以在临界区操作,它自己写完的东西,自己的是可以看见的,所以写锁降级为读锁是非常自然的一种行为,并且几乎没有任何性能影响,但是反过来就不一定行的通了,因为读锁是共享的,也就是说同一时刻有大量的读线程都在临界区读取资源,如果可以允许读锁升级为写锁,这里面就涉及一个很大的竞争问题,所有的读锁都会去竞争写锁,这样以来必然引起巨大的抢占,这是非常复杂的,因为如果竞争写锁失败,那么这些线程该如何处理?是继续还原成读锁状态,还是升级为竞争写锁状态?这一点是不好处理的,所以Java的api为了让语义更加清晰,所以只支持写锁降级为读锁,不支持读锁升级为写锁。
另外:
如果我们有线程A和线程B,线程A和B都获取了读锁,线程A这时候升级为写锁,线程B也升级为写锁,这时由于写是互斥的,那么线程A就需要等待线程B释放自己的读锁,线程B也在等待线程A释放读锁,因为不同线程的读和写不能同时存在,这样就出现了死锁。
public class Main {
Lock lock =new ReentrantLock();
void m1(){
try{
lock.lock();
for(int i =0;i<10;i++){
System.out.println(i);
try{
TimeUnit.SECONDS.sleep(1);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}catch(Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
/**
*** 使用trylock进行尝试锁定,不管锁定与否,方法都将继续执行**
** * 可以根据tryLock的返回值来判定是否锁定,通过boolean值来决定接下来该干什么**
** * 可以指定trylock的时间,如果trylock(time)抛出异常,unlock的处理需要放在finally中**
*/
void m2(){
boolean locked = false;
try{
locked = lock.tryLock(5, TimeUnit.SECONDS); // 如果5秒以后,依旧没有拿到锁,我们就执行下面的代码
System.out.println("m2..."+locked);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
if(locked){
lock.unlock();
}
}
}
public static void main(String[] args) {
Main l = new Main();
new Thread(l::m1).start();
try{
TimeUnit.SECONDS.sleep(1);
}catch(InterruptedException e){
e.printStackTrace();
}
new Thread(l::m2).start();
}
}
Thread t = new Thread(()->{
try{
lock.lockInterruptibly();
System.out.println("t2.start");
}catch(InterruptedException e){
System.out.println("interrupted");
}
});
t.start();
t.interrupt();
闭锁
闭锁是一种同步工具,可以延迟线程的进度直到其到达某种终止状态
闭锁的作用相当于一扇门,当闭锁到达结束状态之前,这扇门一直关闭,并且没有其他线程可以通过,当到达结束状态时,这扇门会打开并允许所有的线程通过
闭锁可以确保某些活动直到其他活动都运行完以后再运行
CountdownLatch是闭锁的实现
CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。
每当一个线程完成了自己的任务后,计数器的值就相应得减1。
当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。
构造函数:
用等待的线程数量来进行初始化
public void CountDownLatch(int count){...}
public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { }; //将count值减1
FutureTask也可以用作闭锁
futuretask表示的计算结果是由callable实现的,相当于可以生成结果的runnable,并且可以出于三种状态:等待运行,正在运行和运行完成
可以使用future.get()来获取数值,当正在运行的时候get被阻塞,只有完成了get才能继续
栅栏 Barrier
栅栏类似于闭锁,它能阻塞某些线程直到某个事件的发生,但是latch是等待某个事件发生,而barrier是等待线程
闭锁带着事件的触发次数,当countdown次数为0的时候,所有线程都被释放才能进行后续的工作,但是触发次数不会被重置,如果需要一个可重置次数的闭锁,可以用栅栏
当线程到达Barrier时,线程将会调用await方法,count-1,如果count==0,就释放所有线程,这个方法阻塞直到所有线程都到达栅栏的位置,如果所有线程到达栅栏的位置,那么栅栏打开,所有线程都被释放,栅栏将被重置以便一下次的使用
barrier基于reentrantlock实现
这种行为阻塞的典型用法之一就是将某个问题分成多个部分,每个部分用不同的线程负责,并记得减少闭锁设置的次数。
当所有线程的工作结束后将通过await方法造成的阻塞,如果我们需要反复进行这样的工作就需要使用栅栏。
如果线程池不够大,那么当多个任务通过栅栏机制来彼此协调的时候,将导致线程饥饿死锁
java的实现是CyclicBarrier
默认的构造方法是
CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier已经到达屏障位置,线程被阻塞。
另外一个构造方法
CyclicBarrier(int parties, Runnable barrierAction),其中barrierAction任务会在所有线程到达屏障后执行。
https://www.jianshu.com/p/424374d71b67
信号量 Counting Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。
mutex是一种semaphore的简化形式,称之为互斥体,初始值为1,代表唯一许可,即互斥锁
应用场景
Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流量控制:
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
在代码中,虽然有30个线程在执行,但是只允许10个并发的执行。Semaphore的构造方法Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()获取一个许可证,使用完之后调用release()归还许可证。还可以用tryAcquire()方法尝试获取许可证。
当使用无界队列的时候,可以使用Semaphore来控制任务的提交速率:
public class BoundExecutor {
private final **Executor exe;**
private final **Semaphore semaphore;**
public BoundExecutor(Executor exe, int bound){
this.semaphore = new Semaphore(bound);
this.exe = exe;
}
public void submitTask(Runnable command) throws InterruptedException{ // 这样就需要先获得许可才能执行任务
semaphore.acquire();
exe.execute(() -> {
try{
command.run();
}finally{
semaphore.release();
}
});
}
}
常用方法:
acquire 获得许可
add 添加数值,之前必须先获得许可
remove 删除数值,之前必须先获得许可
relase 归还许可证
int availablePermits() :返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。
线程封闭
线程封闭的方法就是不共享数据,这是实现线程安全的最简单的方法
ad-hoc 线程封闭
ThreadLocal
栈封闭
只有通过局部变量才能访问对象,这样使对象封装在了内部,由于局部变量存在于栈内存中,所以叫栈封闭
TreeSet对象的引用保存在animal变量中,animal又是一个局部变量,局部变量保存在栈上,每个线程独享栈,不同线程不同栈导致不能操作同一个TreeSet对象,因此保证了线程安全
ThreadLocal 线程局部变量
ThreadLocal是使用空间换时间,synchronized是使用时间换空间
线程都维护自己独有的线程的一份拷贝,线程之间互不影响
当某个频繁执行的操作需要一个临时对象,例如一个缓冲区,而同时有希望避免在每次执行时都要重新分配该临时对象,就可以使用这项技术
只有当线程本地值的生命周期受限于任务的生命周期时,在线程池中使用ThreadLocal才有意义,而在线程池的线程中不应该使用ThreadLocal在任务之间传递值
public void fun1(){
ThreadLocal<String> tl = new ThreadLocal();
t1.set("xixi");
String s = t1.get();
t1.remove(); // 只有三个方法
}
内部实现:
class threadlocal<T>{
private Map<Thread, T> map = new HashMap<Thread,T>();
public void set(){
map.put(Thread.currentThread(),data); // 将数据和当前线程绑定
}
public void get(){
map.get(Thread.currentThread());
}
}
同步容器
同步容器实现线程安全的方法就是把他们的状态封装起来,并对每个公有方法同步,使得每次只有一个线程能访问容器的状态
vector
hashtable
Collections.synchronizedXxx
同步容器可以通过客户端加锁来保护复合操作
同步容器使用迭代器一定要加锁
//对于一个非同步的容器,例如ArrayList,我们可以使用Collections.synchronizedList这样的方法,给容器加锁
// 当迭代返回集合的视图的时候,需要手动加锁
迭代器加锁也是一种客户端加锁,需要synchronized中一定要使用m作为锁,这样别的线程不可对m进行操作,如果不加锁会抛出ConcurrentModificationException
Map m = Collections.synchronizedMap(new HashMap());
...
Set s = m.keySet(); // Needn't be in synchronized block
...
synchronized (m) { // Synchronizing on m, not s!
Iterator i = s.iterator(); // Must be in synchronized block
while (i.hasNext())
foo(i.next());
}
public class ListHelp<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
public boolean putIfAbsent(E x) {
synchronized(list){ // 注意这里的锁只能是list,因为Collections.synchronizedList会把自身作为锁,所以我们需要使用和list一样的锁,即它自己,否则如果是两把锁的话,尽管在synchronized里面,但是不能保证原子性
boolean absent = !list.contains(x);
if (absent) {
list.add(x);
}
return absent;
}
}
}
**错误示例:**
public synchronized boolean putIfAbsent(E x){ **// 错误原因就是同步方法上的锁和list上的锁不是一个锁,当获取同步方法上的锁以后,并不能保证list操作的原子性**
boolean absent = !list.contains(x);
if (absent) {
list.add(x);
}
return absent;
}
// Vector也是线程同步的,这段代码有什么问题?
public class sell_ticket {
static Vector ls = new Vector();
static{
for(int i =0;i<100;i++){
ls.add(i);
}
}
public static void main(String[] args) {
for(int i =0;i<10;i++){
new Thread(()->{
while(ls.size()>0){ // 只保证remove的时候是同步的,get和set分离了,判断完ls.size()以后可能切换到别的线程
ls.remove();
}
});
}
}
}
并发容器
Concurrenthashmap
由于内部使用的是分段锁,所以不能用客户端加锁的方法来执行复合操作,但是ConcurrentHashMap
Concurrentskiplistmap
据有排序功能,基于skiplist 跳表实现
ConccurentLinkedQueue
如果想要使用ConcurrentHashMap完成一些常见的原子操作,可以实现下接口。
public class sell_ticket2 {
static Queue<String> ls = new **ConcurrentLinkedQueue<>()**; // concurrent使用的是分段锁,将容器分为16段,插入的时候只锁定其中的 一段,如果其他线程向其他的分段插入数据的话就不影响
static{
for(int i =0;i<100;i++){
ls.add("票号"+i);
}
}
public static void main(String[] args) {
for(int i =0;i<10;i++){
new Thread(()->{
while(true){
String s = ls.poll();
if(s==null) break;
else System.out.println("sell"+s);
}
});
}
}
}
- ConcurrentSkipListMap
- CopyOnWriteArrayList/CopyOnWriteArraySet
- 写入效率低
- 读取效率高
- 迭代器件不需要对容器进行加锁或复制,目的是为了替代同步list
- 执行操作时进行底层数组的复制,使得修改操作在新的数组上进行,不妨碍原数组中的并发读操作,读操作不需要加锁,添加完元素之后,再把原容器的引用指向新的容器
- 这是一种读写分离的思想,读和写不同的容器
- 添加的时候需要加锁(依赖于reentrantlock),否则多线程会copy多个副本
- copyOnWrite容器的缺点有两个
内存占用问题
- 数据一致性问题,CopyOnWrite容器只保证数据的最终一致性,但是不保证数据的实时一致性,如果希望写入的数据马上就能读到,不要使用CopyOnWrite
public boolean **add**(E e) {
final **ReentrantLock** lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = **Arrays.copyOf(elements, len + 1);**
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
- BlockingQueue 阻塞队列:
阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用(这对于生产比消费速度快很多的情况下,可以让消费者有时间赶上进度);如果队列为空,那么take方法将会阻塞直到有元素可用为止。队列可以是无边界的也可以是有边界的,无边界队列永远都不会满,因此无界队列上的put方法也永远不会阻塞。
BlockingQueue简化了生产者-消费者设计的实现过程,它支持任意数量的消费者和生产者。
BlockingQueue的实现有LinkedBlockingQueue,ArrayBlockingQueue和PriorityBlockingQueue,SynchronousQueue
ArrayBlockingQueue 数组结构组成的有界阻塞队列
此队列按照先进先出(FIFO)的原则对元素进行排序,但是默认情况下不保证线程公平的访问队列,即如果队列满了,那么被阻塞在外面的线程对队列访问的顺序是不能保证线程公平(即先阻塞,先插入)的。
PriorityBlockingQueue既可以根据元素顺序来比较,也可以使用Comparator来实现比较
SynchronousQueue实际上不是一个真正的队列,它维护一组线程,这些线程等待把元素加入或移除队列,生产者和消费者直接交付,Synchronous队列没有储存功能,所以put和take会一直阻塞,直到有线程参与到交付工作中。好处是交付没有延迟,只有当线程池是无界的或者是可以拒绝任务时,SynchronousQueue才是有实际价值的
deque 双端队列
实现了队列头和队列尾的高效插入和移出。
双端队列适用于工作密取
工作密取是每个消费者都有各自的双端队列,如果一个消费者完成了自己的双端队列中的全部工作,就会从其他消费者的双端队列的尾部秘密地获取工作,从而极大地减少了竞争。当工作者线程需要访问另一个队列时,他会从队列的尾部而不是头部获取工作,因此进一步降低了队列上的竞争
Executor接口:
public class exe implements Executor{
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}
Executors:
是一个工具类,提供了不同的工厂方法来创建不同的线程池
ExecutorService接口:
拓展了Executor接口,参数可以使Calable,并且拥有自己的周期方法:
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
ScheduledExecutorService接口:
service.ScheduledAtFixedRate(Runnable command, long initialDelay(起始任务的延迟时间), long period(每隔多久执行一次任务), Timeunit unit(时间单位))
{
固定频率完成任务
}
- newFixedThreadPool (无界的LinkedBlockingQueue)
创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不会再改变(如果某个线程由于发生了未预期的Exception而结束,那么会补充一个新的线程)
它是一种固定大小的线程池;
corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
newCachedThreadPool(SynchronousQueue)
每当来一个任务,如果pool没有空闲的线程,那么就起一个新的线程,如果新的线程空闲超过60s,那么就销毁这个线程,线程池的规模没有约束
它是一个可以无限扩大的线程池;
它比较适合处理执行时间比较小的任务;
corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
-
newSingleThreadExecutor
保证依照任务在队列中的顺序来顺序串行完成
(LinkedBlockingQueue 设置最小线程数和最大线程数为1)
ScheduledThreadPoolExecutor
- newScheduledThreadPool (delayedWorkQueue)-> 基于ScheduledThreadPoolExecutor实现 -> 基于 ThreadPoolExecutor和ScheduledExecutorService实现
ScheduledExecutorService s = Executors.newScheduledThreadPool(10);
public static **ScheduledExecutorService newScheduledThreadPool**(int corePoolSize) {
return new **ScheduledThreadPoolExecutor**(corePoolSize);
}
public class **ScheduledThreadPoolExecutor**
extends **ThreadPoolExecutor**
implements **ScheduledExecutorService**
{}
workStealingPool(工作窃取,基于forkjoinpool实现)
forkjoinpool fork切分任务,join合并任务
threadpoolexecutor 除了workstealpool和forkjoinpool,其他线程池都是直接由threadpoolexecutor实现的!threadpoolexecutor中的线程调度依赖于addWorker()方法,这个方法来创建和切换线程,ThreadPoolExecutor继承自ExecutorService和Executor,将提交的任务在线程池中的可用线程中执行
无限制创建线程的不足:
线程生命周期的开销非常高
资源消耗
稳定性
线程池的好处:
线程池中的线程从工作队列中获取一个任务,执行任务,然后返回线程等待下一个任务(工作队列保存了所有等待执行的任务)
线程池的好处在于,使用的是先有的线程而不是创建新的线程,可以在处理多个请求时分摊线程的创建与销毁的开销。当工作线程存在的时候,不用等待工作线程的创建而延迟任务的执行,从而提升了响应性。