BlockingQueue简析

BlockingQueue

BlockingQueue是一个阻塞队列。在高并发场景是用得非常多的,在线程池中。如果运行线程数目大于核心线程数目时,也会尝试把新加入的线程放到一个BlockingQueue中去。

核心方法

放入数据

add(object)

队列没满的话,放入成功。否则抛出异常。

offer(object)

表示如果可能的话,将object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)

offer(E o, long timeout, TimeUnit unit)

可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

put(object)

把object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程阻塞。直到BlockingQueue里面有空间再继续.

获取数据

poll(time)

取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

poll(long timeout, TimeUnit unit)

从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。

take()

取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;

drainTo()

一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

线程池中对BlockingQueue的两个实现

LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。
当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

此队列的默认和最大长度为Integer.MAX_VALUE。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

此队列按照先进先出的原则对元素进行排序

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

LinkedBlockingQueue数据是放在一个Node节点中

/**
 * Linked list node class
 */
static class Node<E> {
    E item;

    Node<E> next;

    Node(E x) { item = x; }
}

LinkedBlockingQueue有两个锁,一个放入锁,一个取得锁。分别对应放入元素和取得元素时的操作。这是由链表的结构所确定的。但是删除一个元素时,要同时获得放入锁和取得锁。

阻塞等待放入

public void put(E e) throws InterruptedException {  
    if (e == null) throw new NullPointerException();  
    int c = -1;  
    Node<E> node = new Node<E>(e);  
    final ReentrantLock putLock = this.putLock;  
    final AtomicInteger count = this.count;  
    putLock.lockInterruptibly(); //取得放入锁  
    try {  
        while (count.get() == capacity) {//队列已满  
            notFull.await();  
       }  
       enqueue(node);//入队  
       c = count.getAndIncrement();//当前队列中元素个数加1  
       if (c + 1 < capacity)  
           notFull.signal();  
    } finally {  
        putLock.unlock();  
    }  
    if (c == 0)  
        signalNotEmpty();  
}  

取队头元素。没有的话返回null,有的话返回元素,并将队列中删除此元素

public E poll() {  
    final AtomicInteger count = this.count;  
    if (count.get() == 0)  
        return null;  
    E x = null;  
    int c = -1;  
    final ReentrantLock takeLock = this.takeLock;  
    takeLock.lock();//获得取得锁  
    try {  
        if (count.get() > 0) {  
            x = dequeue();//出队  
            c = count.getAndDecrement();//当前队列中元素个数减去1  
            if (c > 1)  
                notEmpty.signal();//不为空条件成功  
        }  
    } finally {  
        takeLock.unlock();  
    }  
    if (c == capacity)  
        signalNotFull();  
    return x;  
}  

删除时要同时取得放入锁和取得锁

public boolean remove(Object o) {  
    if (o == null) return false;  
    fullyLock();//同时取得放入锁和取得锁  
    try {  
        for (Node<E> trail = head, p = trail.next;  
             p != null;  
             trail = p, p = p.next) {  
            if (o.equals(p.item)) {  
                unlink(p, trail);  
                return true;  
            }  
        }  
        return false;  
    } finally {  
        fullyUnlock();  
    }  
}  

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

SynchronousQueue

  1. 容量为0,无论何时 size方法总是返回0
  2. put操作阻塞, 直到另外一个线程取走队列的元素。
  3. take操作阻塞,直到另外的线程put某个元素到队列中。
  4. 任何线程只能取得其他线程put进去的元素,而不会取到自己put进去的元素

构造方法上接收boolean参数,表示这是一个公平的基于队列的排队模式,还是一个非公平的基于栈的排队模式。

public SynchronousQueue(boolean fair) {  
    transferer = fair ? new TransferQueue() : new TransferStack();  
} 

公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容