并发编程-JUC

原子类Atomic

java.util.concurrent.atomic包

保证共享变量操作的原子性、可见性

CAS本质是一条CPU的原子指令,可以保证共享变量修改的原子性

  • 基本类型:AtomicInteger整形原子类

  • 引用类型:AtomicReference引用类型原子类

  • 数组类型: AtomicIntegerArray整形数组原子类

  • 带时间戳引用类型:AtomicStampedReference,解决ABA问题

CAS

compare and swap(比较并交换),Atomic原子类底层实现,在1.8的版本中ConcurrentHashMap也调整为了CAS+Synchronized,AQS中也可以看到大量CAS身影

CAS(V,E,N)

V:修改对象的内存地址

E:期望对象的目标值

N:满足目标值后要更新的新值

Lock

锁划分

  1. 上锁方式划分

    1. 隐式锁:synchronized,不需要显式加锁解锁

    2. 显式锁:JUC提供的锁

  2. 特性划分

    1. 悲观锁/乐观锁

      • JUC锁、synchronized

      • CAS,关系型数据库版本号

    2. 重入锁/不可重入锁

      • 重入锁:ReentrantLock、synchronized

      • 不可重入锁:线程获取锁之后不可重复获取

    3. 公平锁/非公平锁

      • 公平锁:new ReentrantLock(true),多个线程申请同一锁时需要排队,按申请的顺序获取锁

      • 非公平锁:new ReentrantLock(false),多个线程申请同一锁时不需要排队,随机获取锁

    4. 独占锁(排它锁)/共享锁

      • 独占锁:synchronized,ReentrantLock, ReentrantReadWriteLock 的WriteLock写锁

      • 共享锁: ReentrantReadWriteLock 的ReadLock读锁

    5. 自旋锁:CAS

    6. 分段锁:ConcurrentHashMap,给数据分段,每段/Node加锁

    7. 偏向锁、轻量级锁、重量级锁、无锁

Synchronized和JUC锁对比

  • Synchronized无法控制阻塞时长,阻塞不可中断

  • JUC锁可控制时长也可中断

  • 读多写少场景中,Synchronized不论读写都需要同步操作

  • JUC的ReentrantReadWriteLock锁其中ReadLock为共享锁不阻塞

AQS(AbstractQueuedSynchronizer)

抽象同步队列,实现同步器的基础组件,内部通过CAS、volatile、原子类、synchronized实现

AQS是一个FIFO先进先出的双向队列

head、tail:Node类型存储线程,并标记线程状态标记锁类型及队列状态

state:锁状态信息,不同锁不同实现

ConditionObject:条件队列变量,如果需要该功能需要自己实现java.util.concurrent.locks.Lock#newCondition

AQSUML.png

可重入锁ReentrantLock

支持公平和非公平锁

支持重入

支持condition

ReentrantLock#lock源码解析

ReentrantLock实现Lock接口提供锁相关API,关联Sync(继承了AbstractQueuedSynchronizer)提供同步队列的功能

ReentrantLock.Sync抽象类有公平锁和非公平锁的实现

ReentrantLockUML.png
lock时序图.png
  1. lock()调用sync.lock(),sync有FairSync、NonfairSync两种实现,本次看NonfairSync
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))//CAS修改state状态值
                setExclusiveOwnerThread(Thread.currentThread());//设置exclusiveOwnerThread(独占模式同步的当前所有者)为当前线程
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
  1. AbstractQueuedSynchronizer#acquire进行可重入获取锁
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. ReentrantLock.NonfairSync#tryAcquire为AQS实现方法,调用Sync.nonfairTryAcquire
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {//CAS更新状态值为重入次数
                    setExclusiveOwnerThread(current);//更新当前持有线程
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//次数不为0说明有线程持有,判断如果是当前线程累加重入次数
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
  1. tryAcquire返回false,未获取到锁,放入同步队列AbstractQueuedSynchronizer#addWaiter
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {//将该线程新建的node放到队列尾结点
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);//将该线程新建的node放到队列尾结点
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {//将该线程新建的node放到队列尾结点
                    t.next = node;
                    return t;
                }
            }
        }
    }
  1. AbstractQueuedSynchronizer#acquireQueued入队后的当前线程Node继续去获取锁
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//当前Node前置节点如果为头结点,说明当前只有这一个阻塞线程,去尝试获取锁
                    setHead(node);// 获取成功的话,将当前节点线程置为null,并设置为头结点
                    p.next = null; // 将之前的头结点断链,help GC
                    failed = false;
                    return interrupted;
                }
                // 获取锁失败,根据状态判断,设置符合条件前置节点SIGNAL并返回当前线程应该挂起true
                if (shouldParkAfterFailedAcquire(p, node) &&// 未获取锁成功,根据队列中前置线程节点的线程mode判断是否应该挂起
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);//如果获取失败,取消正在进行的获取尝试
        }
    }
  1. AbstractQueuedSynchronizer#selfInterrupt,根据同步队列获取结果挂起当前线程

ReentrantLock#unlock源码解析

unlock时序图.png
  1. unlock()调用sync.release(),其实继承的AbstractQueuedSynchronizer#release
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  1. 调用ReentrantLock.Sync#tryRelease
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())//判断当前线程是否是同步队列持有线程
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);//如果是当前线程并且重入次数为0,设置持有线程为null
            }
            setState(c);//不为0则减少对应重入次数
            return free;
        }
  1. 重入次数设置完成,如果同步队列中还有node,则根据状态进行唤醒

公平锁与非公平锁源码实现区别

非公平锁会先进行一次获取,获取失败才会调用AbstractQueuedSynchronizer#acquire

公平锁在重入次数为0时,先判断队列中是否还有node

公平锁与非公平锁.png
公平锁与非公平锁2.png

返回false表示当前线程可以获取锁,返回true表示队列中有node,当前线程不能获取锁

h=t说明当前队列为空,直接返回false

h != t并且(s = h.next) == null,说明有一个node正在入队,返回true

h != t并且 s.thread != Thread.current,说明队列第一个node不是当前线程,返回true

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

读写锁ReentrantReadWriteLock

通过分离读锁和写锁,使得并发能力比一般的互斥锁有较大提升

支持公平和非公平锁

支持重入

锁降级:写锁可以降级为读锁,但是读锁不能升级为写锁

读锁不支持condition,写锁支持condition

提高锁性能优化

  • 减少锁持有时间:降低锁冲突可能性,将锁竞争代码尽量靠后;局部加锁,必要时加锁

  • 减小锁粒度:ConcurrentHashMap的分段锁

  • 读写分离锁替换独占锁:ReentrantReadWriteLock读多写少场景提升并发能力

  • 锁分离:LinkedBlockingDeque通过condition实现读写功能分离

  • 锁粗化:增加锁范围,减少锁频次

  • JVM对锁优化:偏向锁、轻量级锁、自旋锁、锁消除

并发工具类

CountDownLatch倒计数器

倒数结束前,一直处于等待状态,直到为0,等待线程才继续工作,一次性的,使用完不能再次使用

new CountDownLatch(int count)

await():调用此方法的线程会阻塞,支持多个线程调用,当计数为0,则唤醒线程

countdown():其他线程调用此方法,计数减1

public class CountDownLatchDemo {

    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + "下班回家");
                countDownLatch.countDown();
            }, String.valueOf(i)).start();
        }

        new Thread(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + "卷死你们,卷王下班回家");
        }, "7").start();
    }
}
D:\Java\jdk1.8.0_144\bin\java.exe 
4下班回家
3下班回家
0下班回家
1下班回家
2下班回家
5下班回家
7卷死你们,卷王下班回家
public class CountDownLatchDemo1 implements Runnable {
    static CountDownLatch countDownLatch = new CountDownLatch(10);

    public static void main(String[] args) throws InterruptedException {
        //模拟火箭发射检查程序
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CountDownLatchDemo1 countDownLatchDemo1 = new CountDownLatchDemo1();
        for (int i = 0; i < 10; i++) {
            executor.submit(countDownLatchDemo1);
        }
        countDownLatch.await();
        System.out.println("fire");
        executor.shutdown();
    }

    @Override
    public void run() {
        try {
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + "check complete");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            countDownLatch.countDown();
        }
    }
}
D:\Java\jdk1.8.0_144\bin\java.exe 
pool-1-thread-4check complete
pool-1-thread-7check complete
pool-1-thread-8check complete
pool-1-thread-6check complete
pool-1-thread-5check complete
pool-1-thread-10check complete
pool-1-thread-3check complete
pool-1-thread-1check complete
pool-1-thread-2check complete
pool-1-thread-9check complete
fire

Process finished with exit code 0

Semaphore信号量

控制多个线程同时访问某个资源,支持重复使用

Semaphore(int permits,Boolean fair):支持公平策略设置,同ReentrantLock

acquire():获取许可证

release():释放许可证

tryxxx:支持响应中断

public class SemaphoreDemo {

    public static void main(String[] args) {
        //抢车位模拟程序
        Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到车位了");
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + "停3s离开了");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    semaphore.release();
                }
            }, "Car"+i).start();
        }
    }
}
D:\Java\jdk1.8.0_144\bin\java.exe 
Car0抢到车位了
Car1抢到车位了
Car2抢到车位了
Car2停3s离开了
Car1停3s离开了
Car0停3s离开了
Car5抢到车位了
Car4抢到车位了
Car3抢到车位了
Car3停3s离开了
Car5停3s离开了
Car4停3s离开了

Process finished with exit code 0

CyclicBarrier循环栅栏

线程会等待,直到线程到了事先规定的数目,然后触发执行条件进行约定好的动作,可重复使用
CyclicBarrier(int parties, Runnable barrierAction):设置聚集的线程数量和集齐线程数的结果之后要执行的动作
await():阻塞当前线程,待凑齐线程数量之后继续执行

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, ()->{
            System.out.println("召唤神龙");
        });

        for (int i = 0; i < 14; i++) {
            int finalI = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "收集到第" + finalI + "颗龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (BrokenBarrierException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + "第" + finalI + "颗龙珠飞走了");
            }, String.valueOf(i)).start();
        }
    }
}
D:\Java\jdk1.8.0_144\bin\java.exe 
0收集到第0颗龙珠
1收集到第1颗龙珠
2收集到第2颗龙珠
3收集到第3颗龙珠
4收集到第4颗龙珠
5收集到第5颗龙珠
6收集到第6颗龙珠
7收集到第7颗龙珠
召唤神龙
8收集到第8颗龙珠
9收集到第9颗龙珠
6第6颗龙珠飞走了
10收集到第10颗龙珠
11收集到第11颗龙珠
0第0颗龙珠飞走了
4第4颗龙珠飞走了
3第3颗龙珠飞走了
12收集到第12颗龙珠
2第2颗龙珠飞走了
1第1颗龙珠飞走了
13收集到第13颗龙珠
召唤神龙
5第5颗龙珠飞走了
9第9颗龙珠飞走了
10第10颗龙珠飞走了
8第8颗龙珠飞走了
13第13颗龙珠飞走了
12第12颗龙珠飞走了
11第11颗龙珠飞走了
7第7颗龙珠飞走了

Process finished with exit code 0

CyclicBarrier和CountDownLatch区别

作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需要等待数字到0,也就是说,CountDownLatch用于事件,而CyclicBarrier用于线程
可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用,而CyclicBarrier可以重复使用

Condition接口(条件队列)

执行await()方法,线程就会进入阻塞状态
执行condition.signal()方法,此时被阻塞的线程会被唤醒

Condition用来代替Object.wait/notify两者用法一样
Condition的await()会自动释放持有的Lock锁这点也和Object.wait一样
调用await时必须持有锁,否则会抛出异常。

public class ConditionDemo {

    public static void main(String[] args) {
        // 模拟线程间通信,按洗剪吹顺序执行5次
        HairSalon hairSalon = new HairSalon();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                hairSalon.wash();
            }
        }).start();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                hairSalon.cut();
            }
        }).start();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                hairSalon.blow();
            }
        }).start();
    }

    static class HairSalon {
        private volatile int status = 1;
        private ReentrantLock lock = new ReentrantLock();
        private Condition c1 = lock.newCondition();
        private Condition c2 = lock.newCondition();
        private Condition c3 = lock.newCondition();

        private void wash() {
            lock.lock();
            try {
                while (1 != status) {
                    c1.await();
                }
                System.out.println(Thread.currentThread().getName() + "洗头");
                status = 2;
                c2.signal();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }

        private void cut() {
            lock.lock();
            try {
                while (2 != status) {
                    c2.await();
                }
                System.out.println(Thread.currentThread().getName() + "剪头");
                status = 3;
                c3.signal();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }

        private void blow() {
            lock.lock();
            try {
                while (3 != status) {
                    c3.await();
                }
                System.out.println(Thread.currentThread().getName() + "吹头");
                status = 1;
                c1.signal();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }
    }
}
D:\Java\jdk1.8.0_144\bin\java.exe 
Thread-0洗头
Thread-1剪头
Thread-2吹头
Thread-0洗头
Thread-1剪头
Thread-2吹头
Thread-0洗头
Thread-1剪头
Thread-2吹头
Thread-0洗头
Thread-1剪头
Thread-2吹头
Thread-0洗头
Thread-1剪头
Thread-2吹头

Process finished with exit code 0

并发容器

  1. List容器:

    • Vector:使用synchronized同步锁,数据具有强一致性。适合于对数据有强一致性要求的场景,但性能较差。

    • CopyOnWriteArrayList:底层使用数组存储数据,使用复制副本实现有锁写操作,不能保证强一致性。适合于读多写少,允许读写数据短暂不一致的高并发场景。

  2. Map容器

    • Hashtable:使用synchronized同步锁,数据具有强一致性。适合于对数据有强一致性要求的场景,但性能较差。

    • ConcurrentHashMap:基于数组+链表+红黑树实现,写操作时通过synchronized同步锁,HashEntry作为锁的粒度支持一定程度的并发写,具有弱一致性。适合于存储数据量较小,读多写少且不要求强一致性的高并发场景。

    • ConcurrentSkipListMap:基于跳表实现的有序Map,使用CAS实现无锁化读写,具有弱一致性。适合于存储数据量大,读写都比较频繁,对数据不要求强一致性的高并发场景。

  3. Set容器

    • CopyOnWriteArraySet:底层使用数组存储数据,使用复制副本实现有锁写操作,不能保证强一致性。适合于读多写少,允许读写数据短暂不一致的场景。

    • ConcurrentSkipListSet:基于跳表实现的有序Set,使用CAS实现无锁化读写,具有弱一致性。适合于存储数据量大,读写都比较频繁,对数据不要求强一致性的高并发场景

ConcurrentHashMap

1.7HashMap实现数组+链表,ConcurrentHashMap保存数据使用16个segment,每个segment加锁

1.8HashMap实现数组+链表+红黑树,ConcurrentHashMap在每个Entry(Node)上加锁,粒度更细

ConcurrentHashMap1.7.png
ConcurrentHashMap1.8.png

ConcurrentHashMap#put

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());//通过高低位的异或位运算,降低hash冲突的可能性
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();//如果tab为空,说明是空map,初始化数组
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果hash位置为空,则cas设置当亲位置为put的节点,成功则跳出循环
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//如果正在扩容,则参与帮助转化
                tab = helpTransfer(tab, f);
            else {// 说明当前hash位置存在元素,可能出现hash冲突,加锁来保证线程安全
                V oldVal = null;
                synchronized (f) {//加锁的是单个node,提升一定程度并发性
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&//hash与key都与新put的数据一致,则根据设置更新该值
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;//如果不相同,则遍历链表,插入到链表最后
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//红黑树操作
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

ConcurrentHashMap#get

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {//map数组hash位置不为空
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))//hash值与key相等,说明是该元素
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;//遍历链表,一次比较hash与key,都相同返回
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

其中有有趣的位运算

ConcurrentHashMap#spread 方法的 (h ^ (h >>> 16)) & HASH_BITS;

个人理解,高16位与低16位异或并保留int范围返回,进一步降低hash的冲突概率

tabAt(tab, i = (n - 1) & hash))

类似于HashMap,容量都为2的幂次方,防止传入有误,会找寻大于该值最近的一个2的幂次方,并设置为容量
因容量设置都为2的幂次方,n-1则为每一位都是1的二进制数,位与操作,即与取模效果一样,达到hash寻址的作用
为了效率达到极致,才采用了位运算来代替%取模操作

CopyOnWriteArrayList

COW,写时复制,写操作时并不会修改原内容,而对其进行一次复制副本,写入完成后再使用副本替换原内容
array使用volatile,保证多线程操作可见性

CopyOnWriteArrayList#add(E)

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();// 写操作加锁
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);//获取数据副本
            newElements[len] = e;
            setArray(newElements);//写入后直接替换原数据
            return true;
        } finally {
            lock.unlock();
        }
    }

CopyOnWriteArrayList#get(int)

    public E get(int index) {
        return get(getArray(), index);//从数组中获取下标位置元素
    }

并发队列

ArrayBlockingQueue基于数组实现的有界阻塞队列
LinkedBlockingQueue基于链表实现的有界阻塞队列
SynchronousQueue:不存储元素的阻塞队列
PriorityBlockingQueue:支持按优先级排序的无界阻塞队列
DelayQueue:优先级队列实现的无界阻塞队列
LinkedTransferQueue:基于链表实现的无界阻塞队列
LinkedBlockingDeque:基于链表实现的双向无界阻塞队列

ArrayBlockingQueue案例:1个面试官面试10个程序员,只能同时等待3人

public class ArrayBlockingQueueDemo {

    public static void main(String[] args) {
        ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        Interviewer interviewer = new Interviewer(blockingQueue);
        Engineers engineers = new Engineers(blockingQueue);
        new Thread(interviewer).start();
        new Thread(engineers).start();

    }

    static class Interviewer implements Runnable {
        private BlockingQueue<String> queue;

        public Interviewer(BlockingQueue<String> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            System.out.println("面试官,我准备好面试了");
            String msg;
            try {
                while (!(msg = queue.take()).equals("stop")) {
                    System.out.println(msg + "面试开始了");
                    Thread.sleep(1000);
                    System.out.println(msg + "面试结束了");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("所有人面试结束");
        }
    }

    static class Engineers implements Runnable {
        private BlockingQueue<String> queue;

        public Engineers(BlockingQueue<String> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                String msg = "程序员" + i;
                try {
                    queue.put(msg);
                    System.out.println(msg + "就坐,等待面试");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            try {
                queue.put("stop");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
D:\Java\jdk1.8.0_144\bin\java.exe 
面试官,我准备好面试了
程序员0就坐,等待面试
程序员1就坐,等待面试
程序员2就坐,等待面试
程序员0面试开始了
程序员3就坐,等待面试
程序员0面试结束了
程序员1面试开始了
程序员4就坐,等待面试
程序员1面试结束了
程序员2面试开始了
程序员5就坐,等待面试
程序员2面试结束了
程序员3面试开始了
程序员6就坐,等待面试
程序员3面试结束了
程序员4面试开始了
程序员7就坐,等待面试
程序员4面试结束了
程序员5面试开始了
程序员8就坐,等待面试
程序员5面试结束了
程序员6面试开始了
程序员9就坐,等待面试
程序员6面试结束了
程序员7面试开始了
程序员7面试结束了
程序员8面试开始了
程序员8面试结束了
程序员9面试开始了
程序员9面试结束了
所有人面试结束

Process finished with exit code 0

线程池

基于池化思想管理线程的工具,为避免频繁创建销毁线程,线程池维护一定数量激活的线程,等待可并发执行的任务

优点

降低资源消耗:线程复用
提高响应速度:任务到达,直接派线程执行,无需等待
提高线程可管理型
提供可扩展性

参数

int corePoolSize:核心线程数
int maximumPoolSize:最大线程数,为降低创建销毁消耗,将核心与最大设置一致并不设置存活时间
long keepAliveTime:存活时间
TimeUnit unit:存活时间单位
BlockingQueue<Runnable> workQueue:任务存储队列(阻塞队列)
ThreadFactory threadFactory:线程工程,创建新线程工厂类
RejectedExecutionHandler handler:无法接受任务时的拒绝策略

拒绝策略

AbortPolicy:直接抛出异常,说明任务没有提交成功
DiscardPolicy:线程池会默默的丢弃任务,不会发出通知
DiscardOldestPolicy:队列中存有很多任务,将队列中存在时间最久的任务给丢弃
CallerRunsPolicy:当线程池无法处理任务时,那个线程提交任务由那个线程负责运行。好处在于避免丢弃任务和降低提交任务的速度,给线程池一个缓冲时间,但任务过多时性能下降明显

线程池任务提交过程

线程池任务提交流程.png

创建线程池建议

不建议使用java.util.concurrent.Executors的api创建线程池,根据业务场景需要,使用手动创建线程池

线程数

核心线程数与最大线程数设置一致,避免创建和销毁的额外开销,不用设置存活时间

根据业务场景

  • CPU密集型,一般设置线程数为CPU核心数N
  • IO密集型,CPU空闲,一般设置线程数为2倍CPU核心数2N+1
  • 常用公式:线程数N=CPU核心数*CPU目标使用率(1+线程等待时间/线程CPU时间)

根据监控应用,统计所执行任务平均耗时tasktime及每秒需要执行任务数tasks
需要线程数:tasks/(1/tasktime)
队列长度:(corePoolSize/tasktime)responsetime: (20/0.1)2=400

ThreadLocal

线程本地变量,保证多个线程对变量的安全访问,可以将变量放到ThreadLocal 类型的对象中,使变量在每个线程中都有独立值
ThreadLocal 是解决线程安全问题一个较好方案,它通过为每个线程提供一个独立的本地值,去解决并发访问的冲突问题
ThreadLocal在Spring中作用巨大,在管理Request作用域中的Bean、事务、任务调度、AOP等模块都有它

使用场景

  • 线程隔离

  • 跨函数传递数据

原理

java.lang.ThreadLocal#set

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);//获取当前线程的threadLocals,即该线程所有的ThreadLocal
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);//map为空初始化
    }

java.lang.ThreadLocal#get

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);//获取该线程的threadLocals
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);//map中获取对应Entry 
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();//为空返回初始值
    }

java.lang.ThreadLocal#remove

        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    e.clear();
                    expungeStaleEntry(i);//继续往后遍历删掉过时的entry
                    return;
                }
            }
        }

java.lang.ThreadLocal.ThreadLocalMap.Entry

        //WeakReference<ThreadLocal<?>,ThreadLocalMap的ThreadLocalkey为弱引用
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

Entry 的 Key 为什么需要使用弱引用

threadLocal.png

当一个类初始化并使用ThreadLocal类时,此时该方法与ThreadLocal对象有强引用,同时ThreadLocalMap与ThreadLocal存在key的弱引用,当方法调用完成后,强引用消失,此时发生GC,ThreadLocal弱引用的key会被回收,防止内存泄漏

同时,key回收后,value并未回收,容易产生内存泄漏,所以使用完成应该尽量调用java.lang.ThreadLocal#remove,将其置为null,帮助GC回收。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容