JAVA并发梳理(四) 各种安全队列

在 Java 多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。
Java提供的线程安全的 Queue 可以分为

  • 阻塞队列,典型例子是 LinkedBlockingQueue
    适用阻塞队列的好处:多线程操作共同的队列时不需要额外的同步,另外就是队列会自动平衡负载,即那边(生产与消费两边)处理快了就会被阻塞掉,从而减少两边的处理速度差距。
  • 非阻塞队列,典型例子是 ConcurrentLinkedQueue
    当许多线程共享访问一个公共集合时,ConcurrentLinkedQueue 是一个恰当的选择。

LinkedBlockingQueue 多用于任务队列
ConcurrentLinkedQueue 多用于消息队列

下面分别介绍下JDK中阻塞队列和非阻塞队列的各种实现。

Concurrent Queue的两种实现(非阻塞队列)
1. ConcurrentLinkedQueue

a) 基于链表节点, 无界队列。注意size需要遍历整个链表,且如果有其他修改的操作会导致size不准确;尽量使用isEmpty。
b) 通过无锁CAS的方式操作元素,实现了高并发状态下的高性能。

2. ConcurrentLinkedDeque

线程安全双端队列,实现方式基本同ConcurrentLinkedQueue。

BlockingQueue的各种实现(阻塞队列)

BlockingQueue扩展Queue的方法,使其在在队列已满的情况下添加元素时有了4种模式:
其一,抛异常;boolean add(E e);
其二,立即返回false;boolean offer(E e);
其三,等待;void put(E e) throws InterruptedException;
其四,等待一段时间。 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

JDK中对BlockingQueue提供了近十种实现,学习的时候可以从两个主要方面入手,1) 有界还是无界 2)安全性是如何保证的。

1. ArrayBlockingQueue

a) 内部维护了一个定长数组,以便缓存队列中的数据对象。有界队列。
b) 从其关键属性可以看出来,安全访问控制通过ReentrantLock和Condition配合实现,固每次入列和出列时需要获得全局的锁,因此是不能完全并行的。也有人称其为内部没有实现读写分离。

   /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
2. LinkedBlockingQueue

基于链表的阻塞队列。
a) 其内部也维持着一个数据缓冲队列〈该队列由一个链表构成);可以指定容量,也可以不指定,不指定的话,默认最大Integer.MAX_VALUE。也可以算是个无界队列吧。
b) 其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行,因此能够高效的处理并发数据。

 /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
3. SynchronousQueue

一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费。内部提供了TransferQueueTransferStack,其实是对应FIFO, FILO,模拟公平锁和非公平锁。注意,在没有需要的生产者或者消费者的时候,SynchronousQueue会通过park阻塞线程。

4. PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Comparator对象来决定,传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。
a) 底层是HEAP实现,数组可扩展,因此是无界队列,因此put永远不会阻塞。
b) 锁使用ReentrantLock,以及一个nonEmpty Condition,只有在队列为空,take的时候会阻塞。

/**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    private transient Object[] queue;

    /**
     * The number of elements in the priority queue.
     */
    private transient int size;

    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     */
    private transient Comparator<? super E> comparator;

    /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty
     */
    private final Condition notEmpty;
5. 双端队列BlockingDeque及实现LinkedBlockingDeque
    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();

可以看出来由于只有一个锁,其实也是不能做到真正意义上的两头并行操作的。

LinkedBlockingDeque的基本方法
5. DelayQueue

带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。
对这个队列的实现可以结合ScheduledThreadPoolExecutor相关来看。

终于基本把线程安全Queue相关的实现类都过了一遍了,项目中要结合应用场景选择最合适的使用。这需要建立在对各种实现都很熟悉的基础上。

引用
LinkedBlockingQueue 和 ConcurrentLinkedQueue的用法及区别
java挑战高并发(14): LinkedBlockingQueue和ConcurrentLinkedQueue的区别及用法
java线程安全之并发Queue(十三)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容