BlockingQueue接口及其实现

前言

最近在看并发编程艺术这本书,对看书的一些笔记及个人工作中的总结。

什么是阻塞队列?

A java.util.Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
简而言之就是当队列满时,插入阻塞;当队列为空时,删除(取出)阻塞。常用于生产者和消费者场景。

自己实现一个简单的阻塞队列,实现原理就是通知模式,当队列满时,添加元素阻塞,当队列空时,删除阻塞

/**
 * 意思就是自定义一个队列,当队列中的元素个数等于指定的长度时,put方法要想再加入元素,必须等待take从集合取出元素之后才能放入,这边涉及到wait和notify,
 * 同理,take的时候如果队列中没有元素(此时的元素集合长度为0,那么也要执行等待,等到put进元素才能取出,这边也涉及到wait和notify)
 *
 */
public class MyQueue {

    //1 需要一个承装元素的集合
    private LinkedList<Object> list = new LinkedList<>();

    //2 需要一个计数器
    private AtomicInteger count = new AtomicInteger(0);

    //3 需要制定上限和下限
    private final int minSize = 0;

    private final int maxSize ;

    //4 构造方法
    public MyQueue(int size){
        this.maxSize = size;
    }

    //5 初始化一个对象 用于加锁
    private final Object lock = new Object();


    //put(anObject): 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续.
    public void put(Object obj){
        synchronized (lock) {
            while(count.get() == this.maxSize){
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //1 加入元素
            list.add(obj);
            //2 计数器累加
            count.incrementAndGet();
            //3 通知另外一个线程(唤醒)
            lock.notify();
            System.out.println("新加入的元素为:" + obj);
        }
    }


    //take: 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入.
    public Object take(){
        Object ret = null;
        synchronized (lock) {
            while(count.get() == this.minSize){
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //1 做移除元素操作
            ret = list.removeFirst();
            //2 计数器递减
            count.decrementAndGet();
            //3 唤醒另外一个线程
            lock.notify();
        }
        return ret;
    }

    public int getSize(){
        return this.count.get();
    }


    public static void main(String[] args) {

        //线程run中使用变量必须使用final修饰
        final MyQueue mq = new MyQueue(5);
        mq.put("a");
        mq.put("b");
        mq.put("c");
        mq.put("d");
        mq.put("e");

        System.out.println("当前容器的长度:" + mq.getSize());

        Thread t1 = new Thread(() -> {
            mq.put("f");
            mq.put("g");
        },"t1");

        t1.start();


        Thread t2 = new Thread(() -> {
            Object o1 = mq.take();
            System.out.println("移除的元素为:" + o1);
            Object o2 = mq.take();
            System.out.println("移除的元素为:" + o2);
        },"t2");


        try {
            //使用此单元执行 Thread.sleep.这是将时间参数转换为 Thread.sleep 方法所需格式的便捷方法。
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t2.start();


    }
}

jdk并发包提供的BlockingQueue及其实现


图片.png

然后看其比较主要的实现


图片.png

ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue

基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没有实现读写分离,长度是需要定义的,按照先进先出(FIFO)的原则对元素进行排序。是有界队列(bounded),在很多场合非常适合使用。
默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列。

可以使用构造参数实现公平的阻塞队列

图片.png
/**
 * ArrayBlockingQueue是有界的阻塞队列,如果插入队列中的元素多余自己定义的长度会抛出InterruptedException异常
 *
 */
public class ArrayBlockingQueueTest {

    public static void main(String[] args) throws Exception{
        ArrayBlockingQueue<String> array = new ArrayBlockingQueue<>(5);
        array.put("a");
        array.put("b");
        array.add("c");
        array.add("d");
//        array.add("e");
//        array.add("f");
        //offer方法表示将元素加到队列中的最后,第二个参数和第三个参数表示如果队列满的话,等待时间后还是满的话插入失败
        System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
        System.out.println(array);
    }
}

LinkedBlockingQueue

是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.
链表队列通常比数组队列有更好的吞吐量,但是在大多数并发应用中可预见性低。

/**
 * 阻塞队列,不指定队列长度,底层是由链表实现的,LinkedBlockingDeque之所以能够高效的处理并发数据,
 * 是因为其内部实现采用分离锁(读写分离两个锁),是一个无界队列
 */
public class LinkedBlockingQueueTest {
    public static void main(String[] args) {
        LinkedBlockingQueue<String> q = new LinkedBlockingQueue<>();
        //offer将元素添加到队列的末尾,如果队列不满,添加成功返回true,队列full则返回false
        q.offer("a");
        q.offer("b");
        q.offer("c");
        q.offer("d");
        q.offer("e");
        q.add("f");

        for(Iterator<String> iterator = q.iterator();iterator.hasNext();){
            String value = iterator.next();
            System.out.println(value);
        }

        System.out.println(".............................");

        q.stream().forEach(str -> System.out.println(str));

        System.out.println("..............................");

        List<String> list = new ArrayList<>();

        //javadoc没有这方法的说明,根据返回的结果很明显了,调用drainTo方法返回的是向list集合中添加的元素
        System.out.println(q.drainTo(list, 3));

        System.out.println(list.size());
        for (String string : list) {
            System.out.println(string);
        }
    }
}

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界(unbounded)阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

//take方法按照规定的优先级顺序取出数据
public class PriorityBlockingQueueTest {
    public static void main(String[] args) throws Exception{
        PriorityBlockingQueue<String> q = new PriorityBlockingQueue<>();

        q.add("world");
        q.add("welcome");
        q.add("aaaaaa");
        q.add("hello");

        q.stream().forEach(str -> System.out.println(str));


        PriorityBlockingQueue<Student> studentPriorityBlockingQueue = new PriorityBlockingQueue<>();
        Student student = new Student(1,"miaozhihao");
        Student student1 = new Student(2,"zob");
        Student student2 = new Student(3,"aaaa");
        Student student3 = new Student(2,"tom");
        Student student4 = new Student(2,"bbbb");

        studentPriorityBlockingQueue.add(student);
        studentPriorityBlockingQueue.add(student1);
        studentPriorityBlockingQueue.add(student2);
        studentPriorityBlockingQueue.add(student3);
        studentPriorityBlockingQueue.add(student4);

        System.out.println(studentPriorityBlockingQueue.take());
        System.out.println(studentPriorityBlockingQueue.take());
        System.out.println(studentPriorityBlockingQueue.take());
        System.out.println(studentPriorityBlockingQueue.take());
        System.out.println(studentPriorityBlockingQueue.take());



    }

}


class Student implements Comparable<Student>{

    private int id;
    private String name;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Student(int id,String name){
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "Student{" + "id=" + id + ", name='" + name + '\'' + '}';
    }

    @Override
    public int compareTo(Student student) {
        return this.id - student.id != 0 ? (this.id - student.id) : (this.name.compareToIgnoreCase(student.name));
    }
}

DelayQueue

带有延迟时间的Queue,其中的元素只有当前指定的延迟时间到了,才能够从队列中获取该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个无界队列,应用场景很多

·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

/**
 * 局限,当队列中的head元素时间到了才能取下一个元素,如果第一个元素延迟时间最长,那么等待时间到了之后,元素中的元素一起取出来了
 * 所以,我的demo daqiu1 设置的延迟时间是最短的
 *
 */
public class DelayQueueTest {
    public static void main(String[] args) throws Exception{
        Daqiu daqiu1 = new Daqiu("miaozhihao",1 * 1000 +System.currentTimeMillis());
        Daqiu daqiu2 = new Daqiu("zhangsan",4 * 1000 +System.currentTimeMillis());
        Daqiu daqiu3 = new Daqiu("lisi",2 * 1000 +System.currentTimeMillis());
        Daqiu daqiu4 = new Daqiu("wangwu",13 * 1000 +System.currentTimeMillis());

        final DelayQueue<Daqiu> daqius = new DelayQueue<>();

        Thread th1 = new Thread(() -> {
            while(true){
                try {
                    Daqiu daqiu = daqius.take();
                    System.out.println(daqiu);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        th1.start();

        daqius.add(daqiu1);
        daqius.add(daqiu2);
        daqius.add(daqiu3);
        daqius.add(daqiu4);


    }
}

//加入到DelayQueue的方法必须要实现Delayed接口,而Delayed继承Comparable接口
class Daqiu implements Delayed{

    //打球的人
    private String name;
    //截止时间
    private long endTime;
    //定义时间工具类
    private TimeUnit timeUnit = TimeUnit.SECONDS;

    public Daqiu(String name,long endTime){
        this.name = name;
        this.endTime = endTime;
    }

    @Override
    public String toString() {
        return "打球的人为:"+this.name;
    }

    //使用给定的时间单位返回与当前对象关联的剩余时间
    @Override
    public long getDelay(TimeUnit unit) {
        return endTime - System.currentTimeMillis();
    }

    //compareTo是根据getDelay方法提供指定的顺序
    @Override
    public int compareTo(Delayed delayed) {
        Daqiu daqiu = (Daqiu)delayed;
        return this.getDelay(this.timeUnit) - daqiu.getDelay(this.timeUnit) > 0 ? 1:0;
    }
}

SynchronousQueue

A BlockingQueue blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.
总结一下:
不存储元素的阻塞队列。每一个插入(put)操作必须等待一个删除(take)操作,否则不能继续添加元素。阻塞表现在你不能peek一个synchronous队列除非存在另一个线程存在并且去删除这个队列,不能去插入一个元素除非另外一个线程去删除它。只有另外一个线程在删除元素的时候才能遍历。
它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的SynchronousQueue,如果设置为true,则等待的线程会采用先进先出的顺序访问队列。

图片.png

SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

/**
 * take方法睡眠3s,那么insert操作就阻塞3s等待去take的时候才能去消费它
 */
public class SynchronousQueueTest {
    public static void main(String[] args) throws Exception{

        final SynchronousQueue<String> q = new SynchronousQueue<>();
        Thread t1 = new Thread(() -> {
            try {
                String  value = q.take();
                Thread.sleep(3000);
                System.out.println(value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t1.start();
        Thread t2 = new Thread(() -> q.add("hello"));
        t2.start();
    }
}

其实看了源码也是使用LockSupport工具类的加锁方法实现阻塞的。

LinkedTransferQueue

其javadoc:
An unbounded {@link TransferQueue} based on linked nodes. This queue orders elements FIFO (first-in-first-out) with respect to any given producer. The head of the queue is that element that has been on the queue the longest time for some producer. The tail of the queue is that element that has been on the queue the shortest time for some producer.
Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.
Additionally, the bulk operations addAll, removeAll, retainAll, containsAll, equals, and toArray are not guaranteed to be performed atomically. For example, an iterator operating concurrently with an addAll operation might view only some of the added elements.

总结一下:
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。先进先出。head元素是队列中最长的生产者,tail是队列中最短时间的生产者。
不同于其他的集合,size方法不是恒定不变的,因为队列是异步的,确定当前元素的数目需要去遍历该元素,在遍历的时候修改集合会对结果产生影响。
相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

图片.png

如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。

图片.png

tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。
对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

/**
 * tryTransfer与transfer的区别就是一个有返回值,前者有线程等待则返回true并且将参数(元素)直接推送给消费端,没有消费者则直接返回false
 * 后者不返回这个标志
 */
public class LinkedTransferQueueTest {
    public static void main(String[] args) throws Exception{
        final LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

        //queue.transfer("bbb");

        new Thread(() -> {
            try {
                System.out.println("--------");
                String value = queue.take();
                System.out.println(value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        Thread.sleep(1000);
        //tryTransfer将元素立即传递给消费者,如果当前有消费者等待则返回true,如果没有则返回false
        boolean flag = queue.tryTransfer("aaa");
        System.out.println(flag);


        new Thread(() -> {
            System.out.println("---------");
            try {
                String value = queue.poll(4, TimeUnit.SECONDS);
                System.out.println(value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        Thread.sleep(3000);
        //Thread.sleep(10000);
        queue.transfer("bbb");
        System.out.println("-----end-------");
   }
}

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。

//由链表结构组成的双向阻塞队列
public class LinkedBlockingDequeTest {
    public static void main(String[] args) {
        LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>();
        deque.addFirst("a");
        deque.addFirst("b");
        deque.addFirst("c");
        deque.addLast("e");
        deque.addFirst("f");

        System.out.println(deque);

        System.out.println(deque.getLast());
        System.out.println(deque.peekLast());

    }
}

以上BlockingQueue的实现。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335

推荐阅读更多精彩内容

  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,766评论 1 19
  • 一、并发 进程:每个进程都拥有自己的一套变量 线程:线程之间共享数据 1.线程 Java中为多线程任务提供了很多的...
    SeanMa阅读 2,375评论 0 11
  • 一、多线程 说明下线程的状态 java中的线程一共有 5 种状态。 NEW:这种情况指的是,通过 New 关键字创...
    Java旅行者阅读 4,642评论 0 44
  • 【雪】 『一』 二O一六年 把伊望眼欲穿 那圣洁的倩影 不知扮美了谁的心田 而我 惟有梦乡见、心中念、忆海盼! 二...
    书山苍龙阅读 466评论 8 4
  • nmcli 上篇已经介绍了创建网卡配置,网卡切换等 在CentOS7里面如何实现多网卡绑定 也可以修改文件,这里是...
    数据革命阅读 992评论 0 0