自己实现无锁高并发队列

1.简介

我们程序常常在并发场景下使用,所以我们常常使用支持并发的数据结构,在java中有很多自带的java支持高并发的数据结构,在java.util.concurrent包下,基本由Doug Lea 编写。这次我们自己实现一个无锁高并发队列

2. 基本原理

基本原理还是使用java.concurrent.atomic 包下的原子类,结合最终一致性即可

3. 实现

package liusheng.main;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

/**
 * 使用的是原子类
 *
 * @param <E>
 */
public class ConcurrentQueue<E> {
    static class Node<E> {
        E e;
        volatile Node<E> next;

        public Node(E e) {
            this.e = e;
        }
    }

    AtomicReference<Node<E>> head = new AtomicReference<>();
    AtomicReference<Node<E>> last = new AtomicReference<>();
    AtomicInteger size = new AtomicInteger();

    public ConcurrentQueue() {
        Node<E> node = new Node<E>(null);
        head.set(node);
        last.set(node);
    }

    public void offer(E e) {
        Node<E> node = new Node<>(e);
        Node<E> pre = null;
        do {
            pre = last.get();
        } while (!last.compareAndSet(pre, node));
        pre.next = node;
        size.getAndIncrement();
    }

    public E poll() {
        Node<E> node, node1;
        do {
            node = head.get();
            node1 = node.next;
        } while (node1 != null && !head.compareAndSet(node, node1));
        if (node1 != null) {
            size.decrementAndGet();
            return node1.e;
        }
        return null;
    }

    public static void main(String[] args) throws InterruptedException {
        while (true) {
            ExecutorService executorService = Executors.newFixedThreadPool(20);
            ConcurrentQueue<Long> queue = new ConcurrentQueue<>();
            IntStream.range(0, 10).boxed().map(i -> (Runnable) () -> {
                for (int j = 0; j < 1000; j++) {
                    queue.offer(System.currentTimeMillis());
                }
            }).forEach(executorService::execute);
            AtomicInteger a = new AtomicInteger();
            IntStream.range(0, 10).boxed().map(i -> (Runnable) () -> {
                for (int j = 0; j < 1000; j++) {
                    if (queue.poll() != null) {
                        a.getAndIncrement();
                    }
                }
            }).forEach(executorService::execute);
            executorService.shutdown();
            executorService.awaitTermination(2, TimeUnit.DAYS);
            // System.out.println(a.get() + queue.size.get());
            assert a.get() + queue.size.get() == 10000;
        }
    }
}

4 总结

我们自己实现了一个无锁高并发的队列,可以加深我们对原子类的理解和使用,同时我们在以后看别人的实现,也容易了很多,也加深的了我对多线程的理解。有什么错误的地方望大佬指正

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容