6-Java并发容器和框架

1.ConcurrentHashMap

①为什么要使用ConcurrentHashMap

1)线程不安全的HashMap

多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%。

以下代码会引起死循环(1.8之前)

        final HashMap<String, String> map = new HashMap<>(2);
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            map.put(UUID.randomUUID().toString(), "");
                        }
                    }, "ftf" + i).start();
                }
            }
        }, "ftf");
        t.start();
        t.join();
        System.out.println("ok");

HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一单形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。

1.8之前HashMap在并发执行put操作时,需要扩容的时候会出现链表形成环形数据结构:1.7扩容代码

    void transfer(Entry[] newTable, boolean rehash) {
        int newCapacity = newTable.length;
        for (Entry<K,V> e : table) {
            while(null != e) {
                Entry<K,V> next = e.next;//这一步被挂起,就可能出现环
                if (rehash) {
                    e.hash = null == e.key ? 0 : hash(e.key);
                }
                int i = indexFor(e.hash, newCapacity);
                e.next = newTable[i];
                newTable[i] = e;
                e = next;
            }
        }
    }

假设原来table的a处,是 k1→k2→null,假设所有的key计算出在新的table中的位置都是b

线程A:

e=k1;

next=k1.next=k2;

线程B:

e=k1;

next=k1.next=k2;k1.next=newTable[b]=null;newTable[b]=k1;e=k2;

next=k2.next=null;k2.next=newTable[b]=k1;newTable[b]=k2;e=null;

//现在顺序是:k2→k1→null

线程A:

k1.next=newTable[b]=k2;newTable[b]=k1;e=k2;

//现在顺序是:k1→k2→k1

next=k2.next=k1;k2.next=newTable[b]=k1;newTable[b]=k2;e=k1;

next=k1.next=k2;k1.next=newTable[b]=k2;newTable[b]=k1;e=k2;

next=k2.next=k1;k2.next=newTable[b]=k1;newTable[b]=k2;e=k1;

.......//形成死循环

猜测1.8不会形成死循环,这里并不会出现反向更改Node的next引用的情况

                    else { // preserve order
                        Node<K,V> loHead = null, loTail = null;//原位置
                        Node<K,V> hiHead = null, hiTail = null;//新位置
                        Node<K,V> next;
                        do {
                            next = e.next;
                            if ((e.hash & oldCap) == 0) {//不需要改变位置
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            else {//需要改变位置
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
2)效率低下的HashTable

HashTable容器使用synchronized来保证线程安全,在线程竞争激烈的情况下HashTable的效率非常低下。

3)ConcurrentHashMap的锁分段技术(1.8之前)可有效提升并发访问率

1.8之前:容器中有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问率,这个就是锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,。

1.8:锁粒度降低,如果形成链表,以链表第一个节点为synchronized的对象。

②ConcurrentHashMap的结构

在JDK1.7版本中,ConcurrentHashMap的数据结构是由一个Segment数组和多个HashEntry组成:

JDK1.8的实现已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构实现,并发控制使用synchronized和CAS来操作,整个看起来就像是优化过且现场安全的HashMap,虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本。

常量设计:

// node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认初始值,必须是2的幕数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static final int MOVED     = -1;
// 树根节点的hash值
static final int TREEBIN   = -2;
// ReservationNode的hash值
static final int RESERVED  = -3;
// 可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
 *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
 *当为0时:代表当时的table还没有被初始化
 *当为正数时:表示初始化或者下一次进行扩容的大小
private transient volatile int sizeCtl;

内部一些数据结构:

Node是ConcurrentHashMap存储结构的基本单元,用于存储数据,数据结构就是一个链表,但只允许对数据进行查找,不允许进行修改:

static class Node<K,V> implements Map.Entry<K,V> {
    //链表的数据结构
    final int hash;
    final K key;
    //val和next都会在扩容时发生变化,所以加上volatile来保持可见性和禁止重排序
    volatile V val;
    volatile Node<K,V> next;
    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    //不允许更新value 
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }
    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }
    //用于map中的get()方法,子类重写
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

TreeNode继承与Node,但是数据结构换成了二叉树结构,它是红黑树的数据的存储结构,用于红黑树中存储数据,当链表节点数大于8时会转换成红黑树的结构,他就是通过TreeNode作为存储结构代替Node来转换成红黑树。

static final class TreeNode<K,V> extends Node<K,V> {
    //树形结构的属性定义
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red; //标志红黑树的红节点
    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }
    Node<K,V> find(int h, Object k) {
        return findTreeNode(h, k, null);
    }
    //根据key查找 从根节点开始找出相应的TreeNode,
    final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
        if (k != null) {
            TreeNode<K,V> p = this;
            do  {
                int ph, dir; K pk; TreeNode<K,V> q;
                TreeNode<K,V> pl = p.left, pr = p.right;
                if ((ph = p.hash) > h)
                    p = pl;
                else if (ph < h)
                    p = pr;
                else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                    return p;
                else if (pl == null)
                    p = pr;
                else if (pr == null)
                    p = pl;
                else if ((kc != null ||
                          (kc = comparableClassFor(k)) != null) &&
                         (dir = compareComparables(kc, k, pk)) != 0)
                    p = (dir < 0) ? pl : pr;
                else if ((q = pr.findTreeNode(h, k, kc)) != null)
                    return q;
                else
                    p = pl;
            } while (p != null);
        }
        return null;
    }
}

TreeBin存储树形结构的容器,树形结构就是指TreeNode,所以TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制。

static final class TreeBin<K,V> extends Node<K,V> {
    //指向TreeNode列表和根节点
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // 读写锁状态
    static final int WRITER = 1; // 获取写锁的状态
    static final int WAITER = 2; // 等待写锁的状态
    static final int READER = 4; // 增加数据时读锁的状态
    /**
     * 初始化红黑树
     */
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        this.first = b;
        TreeNode<K,V> r = null;
        for (TreeNode<K,V> x = b, next; x != null; x = next) {
            next = (TreeNode<K,V>)x.next;
            x.left = x.right = null;
            if (r == null) {
                x.parent = null;
                x.red = false;
                r = x;
            }
            else {
                K k = x.key;
                int h = x.hash;
                Class<?> kc = null;
                for (TreeNode<K,V> p = r;;) {
                    int dir, ph;
                    K pk = p.key;
                    if ((ph = p.hash) > h)
                        dir = -1;
                    else if (ph < h)
                        dir = 1;
                    else if ((kc == null &&
                              (kc = comparableClassFor(k)) == null) ||
                             (dir = compareComparables(kc, k, pk)) == 0)
                        dir = tieBreakOrder(k, pk);
                        TreeNode<K,V> xp = p;
                    if ((p = (dir <= 0) ? p.left : p.right) == null) {
                        x.parent = xp;
                        if (dir <= 0)
                            xp.left = x;
                        else
                            xp.right = x;
                        r = balanceInsertion(r, x);
                        break;
                    }
                }
            }
        }
        this.root = r;
        assert checkInvariants(root);
    }
    ......
}

③ConcurrentHashMap的初始化

JDK1.7 ConcurrentHashMap的初始化会通过位与运算来初始化Segment的大小,用ssize来标识,如下:

        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }

concurrencyLevel的最大值是65535,这意味着segments数组的长度最大为65536,对应的二进制是16位。

④ConcurrentHashMap的操作

参考:https://www.cnblogs.com/study-everyday/p/6430462.html

2.ConcurrentLinkedQueue

并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。

使用阻塞算法的队列,可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。

非阻塞的实现方式则可以使用循环CAS的方式来实现。

ConcurrentLinkedQueue是使用非阻塞的方式来实现线程安全队列的。它是一个基于链接节点的无界线程安全队列,采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。

①ConcurrentLinkedQueue的结构

②入队列

入队列的过程

    public boolean offer(E e) {
        checkNotNull(e);
        //入队前,创建一个入队节点
        final Node<E> newNode = new Node<E>(e);
        //死循环,入队不成功反复入队
        for (Node<E> t = tail, p = t;;) {
        //创建一个指向tail节点的引用,用p来标识队列的尾节点,默认情况下等于tail节点。
            Node<E> q = p.next;//获取p节点的下一个节点
            if (q == null) {//p节点是尾节点
                if (p.casNext(null, newNode)) {//设置p节点的next节点为入队节点
                    if (p != t) // t已经不是尾节点了,t=tail,p经过循环后已经改变。
                        casTail(t, newNode);  // 更新tail节点,允许失败
                    return true;
                }
                //另一个线程CAS成功了,重新读取下一个
            }
            else if (p == q)
                //我们已经脱离了队列。如果tail没有变化,它也已经脱离了队列,在这种情况下,我们需要跳到头部,否则跳到尾部
                p = (t != (t = tail)) ? t : head;
            else
                //检查tail更新
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

入队列就是将入队节点添加到队列的尾部。tail节点不总是尾节点。入队方法永远返回true,所以不要通过返回值判断入队是否成功。

在一个队列中依次插入4个节点,示例:

  • 添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
  • 添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。(会跳过1节点)。
  • 添加元素3。设置tail节点的next节点为元素3节点。
  • 添加元素4。设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。

③出队列

    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;//获取p节点的元素
                //p节点的元素不为空,使用CAS设置p节点引用的元素为null,如果CAS成功,返回p节点的元素
                if (item != null && p.casItem(item, null)) {
                    if (p != h) // 一次跳跃两个节点
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {//如果p的下一个节点也为空,说明这个队列已经空了。
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

出队列的就是从队列里返回一个几点元素,并清空该节点对元素的引用。

3.Java中的阻塞队列

①什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,知道队列不满。

2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

在阻塞队列不可用时,这两个附加操作提供了4种处理方式:

  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。(AbstractQueue)
  • 返回特殊值:往队列里插入元素时,成功返回true。移除方法取出一个元素,如果没有则返回null。
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,知道队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一点时间,如果超过了指定的时间,生产者线程就会退出。

注意:无界阻塞队列,队列不可能出现满的情况。

②Java里的阻塞队列

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的支持延时获取元素的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
1)ArrayBlockingQueue

是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。用重入锁ReentrantLock来实现线程访问队列的公平性。

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
2)LinkedBlockingQueue

一个用来链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。

3)PriorityBlockingQueue

一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。不能保证同优先级元素的顺序。

4)DelayQueue

一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

适用于以下场景:

  • 缓存系统的设计:可以保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  • 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

a.如何实现Delayed接口

参考java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask类的实现,一共有三步:

  • 在对象创建的时候,初始化基本数据。使用time记录当前对象延迟到什么时候可以使用,使用sequenceNumber来标识元素在队列中的先后顺序。

            private static final AtomicLong sequencer = new AtomicLong(0);
            ScheduledFutureTask(Runnable r, V result, long ns, long period) {
                super(r, result);
                this.time = ns;
                this.period = period;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
    

  • 实现getDelay方法,该方法返回当前元素还需要延时多长时间,单位是纳秒。当time小于当前时间时,getDelay会返回负数。

            public long getDelay(TimeUnit unit) {
                return unit.convert(time - now(), TimeUnit.NANOSECONDS);
            }
    
  • 实现compareTo方法来指定元素的顺序。例如,让延时时间最长的放在队列的末尾。

            public int compareTo(Delayed other) {
                if (other == this) // compare zero ONLY if same object
                    return 0;
                if (other instanceof ScheduledFutureTask) {
                    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                    long diff = time - x.time;
                    if (diff < 0)
                        return -1;
                    else if (diff > 0)
                        return 1;
                    else if (sequenceNumber < x.sequenceNumber)
                        return -1;
                    else
                        return 1;
                }
                long d = (getDelay(TimeUnit.NANOSECONDS) -
                          other.getDelay(TimeUnit.NANOSECONDS));
                return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
            }
    

b.如何实现延时阻塞队列

当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0) //到时间了
                        return q.poll();
                    else if (leader != null) //leader是等待获取队列头部元素的线程不为空,表示已经有线程在等待获取队列的头元素。
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;//将当前线程设置成leader
                        try {
                            available.awaitNanos(delay);//等待delay时间
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
5)SynchronousQueue

一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。默认采用非公平性策略访问队列。

    public SynchronousQueue(boolean fair) {//true则等待的线程会采用先进先出的顺序访问队列
        transferer = fair ? new TransferQueue() : new TransferStack();
    }

SynchronousQueue负责把生产者线程处理的数据直接传递给消费者线程。队列本身不存储元素,适合传递性场景。吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

6)LinkedTransferQueue

一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法。

a.transfer方法

  • 当前有消费者正在等待接收元素(消费者使用take方法或带时间限制的poll方法时),transfer方法可以把生产者传入的元素liketransfer(传输)给消费者。
  • 没有消费者在等待接收元素,transfer方法会将元素采暖费在队列的tail节点,并等到该元素被消费者消费了才返回。

关键代码如下:

Node pred = tryAppend(s, haveData);//试图把存放当前元素的s节点作为tail节点。
return awaitMatch(s, pred, e, (how == TIMED), nanos);//让CPU自旋等待消费者消费元素。因为自旋会消耗CPU,所以自旋一定次数后使用Thread.yield()方法来暂停当前正在执行的线程,并执行其他线程。

b.tryTransfer方法

用来试探生产者传入的元素是否能直接传递给消费者。如果没有消费者等待接收元素,则返回false。

和transfer方法的区别是无论消费者是否接收,方法立即返回。

tryTransfer(E e, long timeout, TimeUnit unit),试图把生产者传入的元素直接传递给消费者。但如果没有消费者消费该元素,则等待指定的时间再返回,如果超时还没有消费元素,返回false,超时时间内消费了元素,返回true。

7)LinkedBlockingDeque

一个由链表结构组成的双向阻塞队列。可以从队列的两端插入和移出元素,多线程同时入队时,减少了一半订单竞争。相比其他阻塞队列,多了addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast等方法。

以first结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以last结尾的方法表示插入、获取(peek)或移除双端队列的最后一个元素。

另外,插入方法add等同于addLast,remove等效于removeFirst。

初始化时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。

③阻塞队列的实现原理

1)使用通知模式实现。

当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。

    private final Condition notEmpty;
    private final Condition notFull;
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

4.Fork/Join框架

①什么是Fork/Join框架

Fork/Join框架是Java 7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

②工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

每个线程一个队列,当前线程将任务执行完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活。为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。

工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。

③Fork/Join框架的设计

  • 分割任务。需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
  • 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join框架使用两个类来完成以上两件事情:

1)ForkJoinTask:使用Fork/Join框架,先创建一个ForkJoin任务。它提供在任务中执行fork和join操作的机制。一般我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下子类:

  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask:用于有返回结果的任务。

2)ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。

任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

④使用Fork/Join框架

示例(计算1+2+3+4):

public class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 2;//阈值
    private int start;
    private int end;
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            //如果任务大于阈值,就分裂长两个子任务计算
            int middle = (start + end) /2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //执行子任务
            leftTask.fork();//执行fork方法时,又会进入compute方法
            rightTask.fork();
            //等待子任务执行完,并得到其结果
            Integer leftResult = leftTask.join();//join方法会等待子任务执行完并得到其结果
            Integer rightResult = rightTask.join();
            //合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //生成一个计算任务,负责计算
        CountTask task = new CountTask(1,4);
        //执行一个任务
        ForkJoinTask<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

⑤Fork/Join框架的异常处理

ForkJoinTask在执行的时候可能会抛出异常,但我们在主线程里没办法直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException()方法获取异常。

getException()返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

        if (result.isCompletedAbnormally()) {
            System.out.println(result.getException());
        }

⑥Fork/Join框架的实现原理(1.7)

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

1)ForkJoinTask的fork方法实现原理

    public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);//将当前任务存放在ForkJoinTask数组队列里。
        return this;
    }
    final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();//唤醒或创建一个工作线程来执行任务。
            else if (s == m)
                growQueue();
        }
    }

2)ForkJoinTask的join方法实现原理

join方法主要作用是阻塞当前线程并等待获取结果。

    public final V join() {
        //通过doJoin方法得到当前任务的状态
        //任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)、出现异常(EXCEPTIONAL)
        if (doJoin() != NORMAL) 
            return reportResult();
        else
            return getRawResult(); //任务状态是已完成,直接返回任务结果
    }
    private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)  //任务状态是被取消,抛出CancellationException
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) //任务状态是抛出异常,则直接抛出对应的异常
            UNSAFE.throwException(ex);
        return getRawResult();
    }
    private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
                return s;//任务执行完成,直接返回任务状态
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {//从任务数组里取出任务
                try {
                    completed = exec();//执行任务
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);//执行出现异常,记录异常,并将任务状态设置为EXCEPTIONAL
                }
                if (completed)
                    return setCompletion(NORMAL);//任务顺利执行完成,设置任务状态为NORMAL
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342