源码篇-LinkedBlockingQueue元素操作

一、添加元素

1. put函数

public void put(E e) throws InterruptedException {
    // 放入空元素,直接抛异常
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        
        // 如果队列已满,等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 放入元素
        enqueue(node);
        // 元素数量加1,返回旧的元素数量
        c = count.getAndIncrement();
        // 如果队列未满,唤醒其它线程继续添加元素
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果加入队列前元素个数为0,因为现在已经有了元素,所有这时需要唤醒阻塞的消费线程去消费
    if (c == 0)
        signalNotEmpty();
}
2.offer函数带时长
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // 加入的元素为空,则抛异常
    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 如果队列已满,等待,如果等待超过时长,直接返回false
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        // 加入元素
        enqueue(new Node<E>(e));
        // 元素数量加1,返回旧的元素数量
        c = count.getAndIncrement();
        // 如果队列未满,唤醒其它线程继续添加元素
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果加入队列前元素个数为0,因为现在已经有了元素,所有这时需要唤醒阻塞的消费线程去消费
    if (c == 0)
        signalNotEmpty();
    return true;
}
3.offer函数不带时长
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}
  • 三个添加元素方法的区别
    • put(E e)添加元素时,如果队列已满,则一直等待,直到被唤醒
    • offer(E e, long timeout, TimeUnit unit)添加元素时,如果队列已满,则等待指定时长,等待超时,则返回false
    • offer(E e)添加元素时,如果队列已满,直接返回false

二、获取元素

1. take方法
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 如果队列为空,等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 弹出元素
        x = dequeue();
        // 元素个数减1,并返回旧的元素个数
        c = count.getAndDecrement();
        // 如果旧的元素个数大于1,说明现在还有元素可取,唤醒其它线程消费
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 如果旧的元素等于最大容量,说明现在可以添加元素了,唤醒其它线程添加元素
    if (c == capacity)
        signalNotFull();
    return x;
}
2.带时长poll方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
3.不带时长poll方法
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
4. peek方法
public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}
  • 四个方法的区别
    • take(),如果队列为空,则一直等待
    • poll(long timeout, TimeUnit unit),如果队列为空,则等待一定时长,如果等待超时,返回null
    • poll() ,如果队列为空,直接返回null
    • peek(),获取队列第一个元素,为空,直接返回null,不会将队列的元素减1,不执行唤醒操作
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容