阻塞队列初步实现

1.condition的作用是什么?

配合lock实现生产者消费者模式,阻塞、唤醒线程达到线程通信目的

2.尝试用condition去实现一个阻塞队列

随便写的 注重思想
来个思路:维持两个等待队列,一个等着生产,一个等着消费
放入时满了阻塞生产,唤醒消费
获取时空了阻塞消费,唤醒生产
首先,面向接口编程,来个接口

public interface MyBlockQueue<E> {
    void put(E e) throws InterruptedException;
    E take() throws InterruptedException;
}

来个实现

public class MyArrayBlockQueue<E>  implements MyBlockQueue<E>,Serializable {
    private static final long serialVersionUID = 8683452581122892189L;
    transient E[] elementData;
    private static final int MAX_ARRAY_SIZE = 2147483639;
    Lock lock;
    Condition notEmpty;
    Condition notFull;
    int count;

    public MyArrayBlockQueue(int var1) {
        if (var1 > 0) {
            this.elementData = (E[])new Object[var1];
        } else {
            if (var1 != 0) {
                throw new IllegalArgumentException("Illegal Capacity: " + var1);
            }
        }
            this.elementData = (E[])new Object[var1];
            lock = new ReentrantLock();
            notEmpty = lock.newCondition();
            notFull = lock.newCondition();

    }

    public MyArrayBlockQueue() {
        this.elementData = (E[])new Object[0];
        lock = new ReentrantLock();
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

    @Override
    public void put(E e) throws InterruptedException {
        try {
            lock.lock();
            while(count == MAX_ARRAY_SIZE){
                notFull.await();
            }
            elementData[count++] = e;
            notEmpty.signal();
        System.out.println("put:"+toString());  
        }finally {
            lock.unlock();
        }
    }

    @Override
    public E take() throws InterruptedException {
        try{
            lock.lock();
            while(count == 0){
                notEmpty.await();
            }
            E e = elementData[0];
            forword(elementData);
            count--;
       notFull.signal();
           System.out.println("take:"+toString());
            return e;
        }finally {
            lock.unlock();
        }

    }

    private void forword(E[] elementData) {
        for (int i = 1; i < elementData.length; i++) {
            elementData[i-1] = elementData[i];
        }
    }

    @Override
    public String toString() {
        return "MyArrayBlockQueue{" +
                "elementData=" + Arrays.toString(elementData) +
                '}';
    }
}

生产者

public class PutThread  implements Runnable{
    private MyBlockQueue<String> queue;
    public PutThread(MyBlockQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        while(true){
            try {
                queue.put(UUID.randomUUID().toString().substring(0,5));
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者

public class TakeThread implements Runnable {

    private MyBlockQueue<String> queue;

    public TakeThread(MyBlockQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        while(true){
            try {
                queue.take();
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

来个测试

public class Test {
    public static void main(String[] args) throws InterruptedException {
        MyBlockQueue<String> queue = new MyArrayBlockQueue<>(5);
        for (int i = 0; i < 5; i++) {
            new Thread(new PutThread(queue)).start();
            new Thread(new TakeThread(queue)).start();
        }
    }
}

来个效果图


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容