jdk 多线程并发案例

需求: 设计一个容器,最大容器值为10,有生产者和消费者

  • synchronized方式
 public class MyContainer<T> {

   final private List<T> lists = new LinkedList<>();
   final private int MAX = 10; // 容器最大允许10个元素

   public synchronized void put(T t) {
       while (MAX == lists.size()) {
           try {
               System.out.println("队列满了...");
               // wait 99%和 while 一起使用
               this.wait();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }


       try {
           TimeUnit.SECONDS.sleep(1);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       lists.add(t);
       System.out.println("put值=>" + t);

       // 永远使用notifyAll 提示消费者我已经增加了一个元素
       this.notifyAll();
   }

   public synchronized T get() {
       T t;
       while (0 == lists.size()) {
           try {
               System.out.println("队列清空了...");
               this.wait();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
       try {
           TimeUnit.SECONDS.sleep(10);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       t = lists.get(0);
       lists.remove(0);

       this.notifyAll();
       return t;
   }

   public static void main(String[] args) {
       MyContainer<String> myContainer = new MyContainer<>();

       new Thread(() -> {
           for (int i = 0; i < 10000; i++) {
               myContainer.put("add "+i);
           }
       }, "t1").start();

       new Thread(() -> {
           for (int i = 0; i < 10000; i++) {
               System.out.println("get值=>" + myContainer.get());
           }
       }, "t2").start();
   }
}
  • ReentrantLock方式
public class MyContainer2<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10;

    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();

    public void put(T t) {
        try {
            lock.lock();
            while (MAX == lists.size()) {
                System.out.println("队列满...");
                producer.await();
            }
            TimeUnit.SECONDS.sleep(1);
            System.out.println("put value =>" + t);
            lists.add(t);
            consumer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }


    public T get() {
        T t = null;
        try {
            lock.lock();
            while (0 == lists.size()) {
                System.out.println("队列空...");
                consumer.await();
            }
            TimeUnit.SECONDS.sleep(3);
            t = lists.removeFirst();
            producer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return t;
    }

    public static void main(String[] args) {
        MyContainer2<String> myContainer2 = new MyContainer2<>();

        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                myContainer2.put("value " + i );
            }
        }).start();

        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                System.out.println("get value =>"+myContainer2.get());
            }
        }).start();
    }
}

volatile和ThreadLocal

  • volalile 线程变量(状态值)可见(共享),非原子
public class ThreadLocal1 {
    /**
     * volatile 线程可见
     * ThreadLocal 线程副本
     */
    volatile Person p = new Person();


    public static void main(String[] args) {
        ThreadLocal1 th = new ThreadLocal1();


        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(th.p.name);
        },"t2").start();


        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            th.p.name = "lisi";
        },"t2").start();
    }

}

class Person{
    String name = "zhangsan";
}


+ThreadLocal线程副本,变量不共享

/**
 * ThreadLocal 使用空间换时间
 * synchronized 使用时间换空间
 * 所以ThreadLocal效率高些。
 */
public class ThreadLocal2 {

    final static ThreadLocal<Person> th = new ThreadLocal<>();

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(th.get());
        },"t1").start();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            th.set(new Person());
        },"t2").start();
    }

}

class Person1 {
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

需求:有N张火车票,每张都具有一个编号。同时有10个窗口对外售票,编写一个模拟小程序。

  • synchronized方式

/**
 * 有N张火车票,每张都具有一个编号。
 * 同时有10个窗口对外售票
 * 编写一个模拟程序
 *
 * 分析下面的程序可能会产生哪些问题?
 * 重复销售?超量销售
 */
public class TicketSell01 {
    private static final List<String> lists = new LinkedList<>();

    // 初始化1000张票
    static {
        for (int i = 0; i < 1000; i++) {
            lists.add("票"+i);
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                synchronized (TicketSell01.class) {
                    while (lists.size() > 0) {
                        String ele = lists.remove(0);
                        System.out.println("卖掉--" + ele);
                    }
                }
            }).start();
        }
    }
}
  • queen方法,效率高
public class TicketSell02 {
    final static Queue<String> tickets = new ConcurrentLinkedQueue<>();

    static {
        for (int i = 0; i < 1000; i++) {
            tickets.add("票 编号"+i);
        }
    }

    public static void main(String[] args) {
        for (int i=0; i < 10; i++) {
            new Thread(()->{
                while (tickets.size() > 0) {
                    // poll 是 采用硬件的CAS 的机制实现原子性 比synchronized效率高。
                    String ticket = tickets.poll();
                    System.out.println(ticket +  "已卖出...");
                }
            },"t1").start();
        }
    }
}

高并发容器比较(Map)

public class ConcurrentHashMap01 {

    public static void main(String[] args) {
        /**
         * 950ms左右 1.8以前采用的是segment分段加锁(默认值是16)
         * 1.8以以上版本采用CAS(compare and set 在set之前预期值和实际值比较,相同则set,不同得线程等待,直到和预期值相同才更新),
         * 硬件层面保证原子性
         *
         * 适用于无序高并发
         */
//        Map<String,String> map = new ConcurrentHashMap<>();

        /**
         * skiplist 跳表
         * 是有序的 查询效率高
         * duration: 1729ms
         *
         * 适用于有序高并发
         */
        Map<String,String> map = new ConcurrentSkipListMap<>();

        /**
         * 1700ms 左右
         * 通过ObjectOutputStream流来保证原子性,因此效率没有CAS(解释见ConcurrentHashMap注释)高
         */
//    Map<String,String> map = new Hashtable<>();
        // 如果要加锁,使用Collections.synchronizedMap(new HashMap<>())
        /**
         * duration: 1887ms 不稳定 900ms - 1800ms 差不多
         */
//        Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
        // TreeMap
        Random r = new Random();
        Thread[] ths = new Thread[100];

        CountDownLatch latch = new CountDownLatch(ths.length);
        long start = System.currentTimeMillis();

        for (int i = 0; i < ths.length; i++) {
            ths[i] = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    map.put("k" + r.nextInt(1000000), "v" + r.nextInt(100000));
                }
                latch.countDown();
            });
        }

        Arrays.asList(ths).forEach(Thread::start);

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();

        // 101ms
        System.out.println("duration: " + (end - start) + "ms");
    }


}

高并发容器比较(array)

  • ArrayList/Vector/CopyOnWriteArrayList
public class CopyOnWrite01 {
    public static void main(String[] args) {
        List<String> lists =
                // 线程不安全 duration: 121 ms, size: 98792 条
                new ArrayList<>();
        /**
         * 查看源码:
         * Save the state of the {@code Vector} instance to a stream (that
         * is, serialize it).
         * This method performs synchronization to ensure the consistency
         * of the serialized data.
         * private void writeObject(java.io.ObjectOutputStream s)
         *
         * 线程安全 通过 ObjectOutputStream序列化的方式 保证原子性 duration: 68 ms, size: 100000 条
         *
         */
//                new Vector<>();
                // 特点 写慢 读快 应用场景: 字典表缓存等 duration: 5275 ms, size: 100000 条
//                new CopyOnWriteArrayList<>();
        Random r = new Random();
        Thread[] ths = new Thread[100];

        for (int i = 0; i < ths.length; i++) {
            ths[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    lists.add("a" + r.nextInt(100000));
                }
            });
        }
        long s1 = System.currentTimeMillis();

        List<Thread> threads = Arrays.asList(ths);
        // 启动线程
        threads.forEach(Thread::start);
        // 等待所有线程完成
        threads.forEach((t) -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        long s2 = System.currentTimeMillis();
        System.out.printf("duration: %s ms, size: %d 条",(s2 - s1), lists.size());
    }
}

  • SynchronizedList
public class SynchronizedList01 {

    public static void main(String[] args) {
        List<String> strs = new ArrayList<>();

        //传入ArrayList,返回 加了锁(synchronized)的list
        List<String> strsSync = Collections.synchronizedList(strs);
    }
}
附源码,在每个list方法上面添加了synchronized
        public void add(int index, E element) {
          synchronized (mutex) {list.add(index, element);}
        }
  • ConcurrentLinkedQueue和ConcurrentLinkedDeque
public class ConcurrentQueen {

    public static void main(String[] args) {
        // linkedQueen 无界队列
        Queue<String> strs = new ConcurrentLinkedQueue<>();

        for (int i = 0; i < 10; i++) {
            // add offer的区别 如果是容量有限的容器 add加入null会抛异常;offer不会,会有一个boolean类型的返回值。
            strs.offer("a" + i);
        }

        System.out.println(strs);

        System.out.println(strs.size());

        System.out.println(strs.poll());

        System.out.println(strs.size());

        // peek 和 poll 都会返回首个元素 不同的是peek会删除 poll不会删除元素
        System.out.println(strs.peek());
        System.out.println(strs.peek());
        System.out.println(strs.size());

        System.out.println("双端队列...");
        // 双端队列 Deque
        Deque<String> deque = new ConcurrentLinkedDeque<>();
        for (int i = 0; i < 10; i++) {
            deque.offerLast("b" + i);
        }

        System.out.println(deque);
        System.out.println(deque.getFirst());
        System.out.println(deque.size());

        System.out.println(deque.pollLast());
        System.out.println(deque);

        System.out.println(deque.peekLast());
        System.out.println(deque.peekFirst());
        System.out.println(deque);

    }
}
  • BlockingQueue 阻塞队列
    • 无解队列 LinkedBlockingQueue
        public class BlockingQueen01 {
    
      /**
       * 阻塞队列
       */
      static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
    
    
      static Random r = new Random();
    
      public static void main(String[] args) {
          // 启动一个线程生产
          new Thread(() -> {
              for (int i = 0; i < 100; i++) {
                  try {
                      // 如果队列满了,则等待
                      strs.put("a" + i);
    
                      TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }, "t1").start();
    
    
          // 启动五个线程消费
          for (int i = 0; i < 5; i++) {
              new Thread(() -> {
                  while (true) {
                      try {
                          // 如果空了,就等待
                          System.out.println(Thread.currentThread().getName() + " take - " + strs.take());
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              },"t2-"+i).start();
          }
      }
    
    

}

  • 有界队列 ArrayBlockingQueue(10)
public class BlockingQueue02 {

    static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);

    static Random r = new Random();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            strs.put("a" + i);
        }

        System.out.println(strs);
        // 满了就会等待,程序阻塞
//        strs.put("aaa");

        // 满了就会返回一个boolean
//        System.out.println(strs.offer("aaa"));
        // 满了就会抛出异常
//        strs.add("aaa");

        // 满了,就会等待(阻塞)10s,然后返回boolean
        System.out.println(strs.offer("aaa", 10, TimeUnit.SECONDS));

    }
}
  • DelayQueue

public class BlockingQueue03 {

    // 应用场景:定时任务
    static BlockingQueue<MyTask> tasks = new DelayQueue<>();

    public static void main(String[] args) {
        long cur = System.currentTimeMillis();
        tasks.offer(new MyTask(cur + 2000));
        tasks.offer(new MyTask(cur + 1500));
        tasks.offer(new MyTask(cur + 10000));
        tasks.offer(new MyTask(cur + 2300));
        tasks.offer(new MyTask(cur + 500));


        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    System.out.println(tasks.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }
}

class MyTask implements Delayed {

    long runningTime;

    MyTask(long rt) {
        this.runningTime = rt;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(runningTime - System.currentTimeMillis(),unit);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) return -1;
        else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1;
        else return 0;
    }
}
  • LinkedTransferQueue
public class TransferQueue01 {

    public static void main(String[] args) {
        // 用于更高的高并发情况
        LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();

//        new Thread(() -> {
//            try {
//                System.out.println(strs.take());
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        }).start();

//        try {
//            strs.transfer("aaa");
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }

        strs.put("aaa");

        new Thread(() -> {
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();


    }
}
  • SynchronousQueue
// 容量为0 特殊的 transterqueue 生产的东西必须立马消费
        BlockingQueue<String> strs = new SynchronousQueue<>();

        new Thread(() -> {

            try {
                // take 阻塞等待生产者生产
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 阻塞等待消费者消费
        strs.put("aaa");
        strs.add("aaa");

总结:

总结:

  1. 对于map/set的使用
  • hashmap 散列
  • treemap 树状
  • linkedhashmap 链式
  • hashtable 线程安全(通过ObjectOuputStream序列化)
  • Collections.sychronizedXXX (给非线程安全的map加sychronized锁)
  • concurrenthashmap (CAS(操作系统 compare and set 预期值与实际值比较,相等set,不相等直到相等set)锁)
  • concurrentskiplistmap()
  1. list
  • ArrayList
  • LinkedList
  • Collections.synchronizedxxx
  • Queue
    • ConcurrentLinkedQueue
    • BlockingQueue
      • LinkedBQ
      • ArrayBQ
      • TransferQueue
      • SynchrousQueue
    • DelayQueue 定时任务
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • 一、基础知识:1、JVM、JRE和JDK的区别:JVM(Java Virtual Machine):java虚拟机...
    杀小贼阅读 2,362评论 0 4
  •   一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个程序运行时,内部可能包含了多个顺序执行流,每个顺...
    OmaiMoon阅读 1,662评论 0 12
  • Java-Review-Note——4.多线程 标签: JavaStudy PS:本来是分开三篇的,后来想想还是整...
    coder_pig阅读 1,629评论 2 17
  • 本系列出于AWeiLoveAndroid的分享,在此感谢,再结合自身经验查漏补缺,完善答案。以成系统。 Java基...
    济公大将阅读 1,523评论 1 6
  • java笔记第一天 == 和 equals ==比较的比较的是两个变量的值是否相等,对于引用型变量表示的是两个变量...
    jmychou阅读 1,483评论 0 3