java中的并发容器

在讲并发容器之前,先看一个小案例,经典的多线程买票问题

(1)普通的思路来写,分析问题所在

  • 下面程序模拟卖票可能会出现两个问题:①票卖重了 ②还剩最后一张票时,好几个线程同时抢,出现-1张票

  • 出现上面两个问题主要是因为:①remove()方法不是原子性的 ②判断+操作不是原子性的


public class TicketSeller1 {

    static List<String> tickets = new ArrayList<>();

    static {

        for (int i=0; i<10000; i++) {  //共一万张票

            tickets.add("票编号--" + i);

        }

    }

    public static void main(String[] args) {

        for (int i=0; i<10; i++) {   //共10个线程卖票

            new Thread(()->{

                while(tickets.size() > 0) {  //判断余票

                    System.out.println("销售了..." + tickets.remove(0)); //操作减票

                }

            }).start();

        }

    }

}

(2)使用线程安全的容器Vector
本程序虽然用了Vector作为容器,Vector中的方法都是原子性的,但是在判断size和减票的中间还是可能被打断的,即被减到-1张


public class TicketSeller2 {

    static Vector<String> tickets = new Vector<>();  //Vector是一个同步容器

    static {

        for (int i=0; i<100; i++) tickets.add("票编号-" + i);

    }

 

    public static void main(String[] args) {

        for (int i=0; i<10; i++) {

            new Thread(()->{

                while(tickets.size() > 0) {  //判断余票

                    try {

                        Thread.sleep(500);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                    System.out.println("销售了--"+tickets.remove(0));  //操作减票

                }

            }).start();

        }

    }

}

(3)在判断和操作放在同步代码块中
将判断和操作外面加锁,程序完全没有功能上的问题,但是效率很低


public class TicketSeller3 {

    static List<String> tickets = new LinkedList<>();

    static {

        for (int i=0; i<100; i++) {  //共100张票

            tickets.add("票编号:" + i);

        }

    }

    public static void main(String[] args) {

        for (int i=0; i<10; i++) {   //共10个线程卖票

            new Thread(()->{

                while(true) {

                    synchronized (tickets) {

                        if (tickets.size() <= 0) break;  //判断 余票

                        try {

                            Thread.sleep(100);

                        } catch (InterruptedException e) {

                            e.printStackTrace();

                        }

                        System.out.println("销售了--" + tickets.remove(0)); //操作减票

                    }

                }

 

            }).start();

        }

    }

}

(4)使用队列(Queue)来实现

  • ConcurrentLinkedQueue底层不是加锁的实现,而是ConcurrentSet,效率会高很多。

  • Queue一开始不是空的。先poll,再判断tickets是不是空的,最后没有任何操作,所以不用加锁也不会出现任何问题

public class TicketSeller4 {

    static Queue<String> tickets = new ConcurrentLinkedQueue<>();

    static {

        for (int i=0; i<1000; i++) {

            System.out.println("票编号:" + i );

        }

    }

 

    public static void main(String[] args) {

        for (int i=0; i<10; i++) {

            new Thread( ()-> {

                while(true) {

                    String str = tickets.poll(); //poll方法是原子性的,拿出一张票

                    if(str == null) break;

                    else System.out.println("销售了.." + str);

                }

            }).start();

        }

    }

}

一、List和Map相关

1、ConcurrentHashMap

HashMap和HashTable的区别就是HashTable是线程安全的,支持并发操作,效率不够高,所有方法都加了锁,而HashMap线程不安全,实现相对简单,不支持并发操作。

  • JDK1.7中ConcurrentHashMap支持并发操作,采用ReentrantLock+Segment+HashEntry机制,整个 ConcurrentHashMap 由一个个 Segment 组成,需要经过两次hash运算,先定位到Segment,第二次定位到元素所在的头部。Segment 通过继承 ReentrantLock 来加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。concurrencyLevel(并行级别/并发数/Segment 数)默认是 16,即 ConcurrentHashMap 有 16 个 Segments,所以理论上,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。

  • JDK 1.8摒弃了1.7中的分段锁的概念,采用的是synchronized+CAS+HashEntry+红黑树。
    CAS是compare and swap的缩写,即我们所说的比较交换。cas是一种基于锁的操作,而且是乐观锁。在java中锁分为乐观锁和悲观锁。悲观锁是将资源锁住,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁采取了一种宽泛的态度,通过某种方式不加锁来处理资源,比如通过给记录加version来获取数据,性能较悲观锁有很大的提高。
    CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存地址里面的值和A的值是一样的,那么就将内存里面的值更新成B。CAS是通过无限循环来获取数据的,若果在第一轮循环中,a线程获取地址里面的值被b线程修改了,那么a线程需要自旋,到下次循环才有可能机会执行。
    Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。

2、ConcurrentSkipListMap和ConcurrentSkipSet

跳表(SkipList)是一种随机化的数据结构,通过“空间来换取时间”的一个算法,建立多级索引,实现以二分查找遍历一个有序链表。时间复杂度等同于红黑树,O(log n)。ConcurrentSkipListMap和ConcurrentSkipSet
是基于跳表实现的一种线程安全的并且可以排序的容器。插入时效率比较低,但查找时效率较高。

总结

  • 在不加锁的情况下,可以用:HashMap、TreeMap、LinkedHashMap。想加锁可以用Hashtable(用的非常少)。
  • 在并发量不是很高的情况下,可以用Collections.synchronizedXxx()方法,在该方法中传一个不加锁的容器(如Map),它返回一个加了锁的容器(容器中的所有方法加锁)!
  • 在并发性比较高的情况下,用ConcurrentHashMap ,如果并发性高且要排序的情况下,用ConcurrentSkipListMap。

3、CopyOnWriteArrayList

CopyOnWriteArrayList在多线程环境下,写时效率低,读时效率高,适合写少读多的环境,比如事件监听器。

  • 写时复制:添加元素的时候,会把这个容器复制一份,在复制的那份后面加一个新的,将引用指向复制的那份。
  • 读的时候不用加锁,适合写的很少,读的特别多的时候。
public class CopyOnWriteListTest {

    public static void main(String[] args) {

        List<String> list =

//                new ArrayList<>();  //这个会出并发问题,最后size<100000,,运行时间:0.1秒多

//                new Vector<>(); //size=100000,,运行时间:0.1秒多

                new CopyOnWriteArrayList<>(); //size=100000,效率很低,因为一直在“复制、写”,运行时间:5秒多

        Random r = new Random();

        Thread[] threads = new Thread[100];

        for (int i=0; i<threads.length; i++) {  //起100个线程,每个线程向容器中加1000个数(最终应该是10万个数)

            Runnable task = new Runnable() {

                @Override

                public void run() {

                    for (int j=0; j<1000; j++) list.add("a" + r.nextInt());

                }

            };

            threads[i] = new Thread(task);

        }

        runAndComputeTime(threads);

        System.out.println(list.size());

    }

    static void runAndComputeTime(Thread[] threads) {

        long start = System.currentTimeMillis();

        Arrays.asList(threads).forEach(t->t.start());

        Arrays.asList(threads).forEach(t->{

            try {

                t.join();

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        });

        long end = System.currentTimeMillis();

        System.out.println(end-start);

    }

}

二、Queue队列相关

Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue

1、ConcurrentLinkedQueue 并发队列

  • 一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。
    *ConcurrentLinkedQueue是非阻塞队列,其没有put和take方法,可以无限制存,且是线程安全的,N个用户同时存也能保证每次存放在队尾而不乱掉,但是其的size()方法会不定时遍历,所以特耗时
public class ConcurrentQueue {

    public static void main(String[] args) {

        Queue<String> strs = new ConcurrentLinkedQueue<>();   //还有双端队列...Deque

        for (int i=0; i<10; i++) {

            //类似于add方法,如果是ArrayQueue,add方法可能会抛异常,但是offer方法不会抛异常,返回boolean类型即是否添加成功

            strs.offer("a"+i);

        }

        System.out.println(strs);  //[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]

        System.out.println("队列原始大小:" + strs.size());   //队列原始大小:10

        //poll方法表示从头上拿出一个删掉;peek方法表示从头上拿出一个用一下不删。

        System.out.println("poll " + strs.poll() + "后的大小为:" + strs.size()); //poll a0后的大小为:9

        System.out.println("peek " + strs.peek() + "后的大小为:" + strs.size()); //peek a1后的大小为:9

    }

}

2、 LinkedBlockingQueue 无界阻塞式队列

public class LinkedBlockingQueueTest {

    static BlockingQueue<String> strs = new LinkedBlockingQueue<>();

    static Random r = new Random();

    public static void main(String[] args) {

        new Thread(()->{  //1个生产者线程

            for (int i=0; i<100; i++) {

                try {

                    strs.put("a" + i);  //如果满了,就会等待

                    TimeUnit.MILLISECONDS.sleep(r.nextInt(100));

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        },"producer").start();

 

        for (int i=0; i<5; i++) {  //5个消费者进程

            new Thread(()-> {

                for (;;) {

                    try {

                        System.out.println(Thread.currentThread().getName()

                                + " take-" + strs.take()); //如果空了,就等待

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

            },"customer"+i).start();

        }

    }

}

3、 ArrayBlockingQueue 有界阻塞式队列

/*有界阻塞式队列*/

public class ArrayBlockingQueueTest {

   static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); //最多装10个

   static Random r = new Random();

 

    public static void main(String[] args) {

        for (int i=0; i<10; i++) {

            try {

                strs.put("a" + i);  //向容器中添加10个,就满了

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

        try { //strs已经满了,以下方法都加不进去,但是处理方式不同

            strs.put("aaa");//发现满了,就会等待,程序阻塞

            strs.add("aaa");  //已经满了,再往里面装就会报异常

            strs.offer("aaa");//不会报异常,但是加不进去,返回是否添加成功

            strs.offer("aaa",1,TimeUnit.SECONDS); //1秒钟后加不进去,就不往里面加了

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        System.out.println(strs);

    }

}

4、 DelayQueue 执行定时任务

往DelayQueue里加的元素是按时间排好序的,该队列是无界的。另外元素要实现Delayed接口,而Delayed接口又继承了Comparable接口,所以该类元素需要实现compareTo()方法,用来对元素在队列中进行排序;并且每个元素记载着自己还有多长时间才能被拿走,还要实现getDelay()方法,只有getDelay()返回值为负数时才能被拿走。

public class DelayQueueTest {

    static DelayQueue<MyTask> tasks = new DelayQueue<>();

    static class MyTask implements Delayed {  //实现Delayed接口

        long runningTime;

        String name;

        MyTask(long rt,String name) {

            this.runningTime = rt;

            this.name = name;

        }
        @Override

        public long getDelay(TimeUnit unit) { 

            return unit.convert(runningTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);

        }
        @Override

        public int compareTo(Delayed o) {

            if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))

                return -1;

            else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))

                return 1;

            else  // ==

            return 0;

        }
        @Override

        public String toString() {

            return name + "--" + runningTime;

        }

    }
    public static void main(String[] args) {

        long now = System.currentTimeMillis();

        MyTask t1 = new MyTask(now + 1000, "task1"); //1 s 后执行 //②

        MyTask t2 = new MyTask(now + 2000, "task2"); //2 s后执行  //④

        MyTask t3 = new MyTask(now + 1500, "task3"); //1.5s后执行 //③

        MyTask t4 = new MyTask(now + 500, "task4");  //0.5s后执行 //①

        MyTask t5 = new MyTask(now + 2500, "task5"); //2.5s后执行 //⑤
        tasks.put(t1);

        tasks.put(t2);

        tasks.put(t3);

        tasks.put(t4);

        tasks.put(t5);

        System.out.println(tasks);

        for (int i=0; i<5; i++) {

            try {

                System.out.println(tasks.take()); //按放进去的顺序拿出

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

}

5、 TransferQueue(接口)
消费者先启动,生产者生产一个东西的时候,不扔在队列里,而是直接去找有没有消费者,有的话直接扔给消费者,若没有消费者线程,调用transfer()方法就会阻塞,调用add()、offer()、put()方法不会阻塞。
LinkedTransferQueue为TransferQueue接口的实现类

public class TransferQueueTest {

    public static void main(String[] args) throws InterruptedException {

        LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();

        new Thread(()->{ //消费者先启动,可以拿走aaa

            try {

                System.out.println(strs.take());

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }).start();

        strs.transfer("aaa");

//        strs.put("aaaa");  //add、offer

//        new Thread(()->{ //消费者在生产者后启动,拿不到aaa,程序阻塞

//            try {

//                System.out.println(strs.take());

//            } catch (InterruptedException e) {

//                e.printStackTrace();

//            }

//        }).start();

    }

}

5、 SynchronousQueue
SynchronousQueue为一种特殊的TransferQueue,实现了BlockingQueue接口,生产的任何一个东西必须直接交给消费者消费,不能搁在容器里,容器的容量为0。因此,调用add()会报错,offer()始终为false,poll()始终为null。调用put()方法,如果没有消费者,则线程阻塞,该方法内部调用了TransferQueue的transfer()方法。

public class SynchronizeQueueTest {

    public static void main(String[] args) throws InterruptedException {

        BlockingQueue<String> strs = new SynchronousQueue<>();

 

        new Thread(()->{  //消费者线程

            try {

                System.out.println(strs.take());

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }).start();

        strs.put("aaaa"); //不能调用add(报错),add不进去,put阻塞,等待消费者消费,内部调用的transfer.

        System.out.println(strs.size());  //0

    }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342