在讲并发容器之前,先看一个小案例,经典的多线程买票问题
(1)普通的思路来写,分析问题所在
下面程序模拟卖票可能会出现两个问题:①票卖重了 ②还剩最后一张票时,好几个线程同时抢,出现-1张票
出现上面两个问题主要是因为:①remove()方法不是原子性的 ②判断+操作不是原子性的
public class TicketSeller1 {
static List<String> tickets = new ArrayList<>();
static {
for (int i=0; i<10000; i++) { //共一万张票
tickets.add("票编号--" + i);
}
}
public static void main(String[] args) {
for (int i=0; i<10; i++) { //共10个线程卖票
new Thread(()->{
while(tickets.size() > 0) { //判断余票
System.out.println("销售了..." + tickets.remove(0)); //操作减票
}
}).start();
}
}
}
(2)使用线程安全的容器Vector
本程序虽然用了Vector作为容器,Vector中的方法都是原子性的,但是在判断size和减票的中间还是可能被打断的,即被减到-1张
public class TicketSeller2 {
static Vector<String> tickets = new Vector<>(); //Vector是一个同步容器
static {
for (int i=0; i<100; i++) tickets.add("票编号-" + i);
}
public static void main(String[] args) {
for (int i=0; i<10; i++) {
new Thread(()->{
while(tickets.size() > 0) { //判断余票
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("销售了--"+tickets.remove(0)); //操作减票
}
}).start();
}
}
}
(3)在判断和操作放在同步代码块中
将判断和操作外面加锁,程序完全没有功能上的问题,但是效率很低
public class TicketSeller3 {
static List<String> tickets = new LinkedList<>();
static {
for (int i=0; i<100; i++) { //共100张票
tickets.add("票编号:" + i);
}
}
public static void main(String[] args) {
for (int i=0; i<10; i++) { //共10个线程卖票
new Thread(()->{
while(true) {
synchronized (tickets) {
if (tickets.size() <= 0) break; //判断 余票
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("销售了--" + tickets.remove(0)); //操作减票
}
}
}).start();
}
}
}
(4)使用队列(Queue)来实现
ConcurrentLinkedQueue底层不是加锁的实现,而是ConcurrentSet,效率会高很多。
Queue一开始不是空的。先poll,再判断tickets是不是空的,最后没有任何操作,所以不用加锁也不会出现任何问题
public class TicketSeller4 {
static Queue<String> tickets = new ConcurrentLinkedQueue<>();
static {
for (int i=0; i<1000; i++) {
System.out.println("票编号:" + i );
}
}
public static void main(String[] args) {
for (int i=0; i<10; i++) {
new Thread( ()-> {
while(true) {
String str = tickets.poll(); //poll方法是原子性的,拿出一张票
if(str == null) break;
else System.out.println("销售了.." + str);
}
}).start();
}
}
}
一、List和Map相关
1、ConcurrentHashMap
HashMap和HashTable的区别就是HashTable是线程安全的,支持并发操作,效率不够高,所有方法都加了锁,而HashMap线程不安全,实现相对简单,不支持并发操作。
JDK1.7中ConcurrentHashMap支持并发操作,采用ReentrantLock+Segment+HashEntry机制,整个 ConcurrentHashMap 由一个个 Segment 组成,需要经过两次hash运算,先定位到Segment,第二次定位到元素所在的头部。Segment 通过继承 ReentrantLock 来加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。concurrencyLevel(并行级别/并发数/Segment 数)默认是 16,即 ConcurrentHashMap 有 16 个 Segments,所以理论上,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。
JDK 1.8摒弃了1.7中的分段锁的概念,采用的是synchronized+CAS+HashEntry+红黑树。
CAS是compare and swap的缩写,即我们所说的比较交换。cas是一种基于锁的操作,而且是乐观锁。在java中锁分为乐观锁和悲观锁。悲观锁是将资源锁住,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁采取了一种宽泛的态度,通过某种方式不加锁来处理资源,比如通过给记录加version来获取数据,性能较悲观锁有很大的提高。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存地址里面的值和A的值是一样的,那么就将内存里面的值更新成B。CAS是通过无限循环来获取数据的,若果在第一轮循环中,a线程获取地址里面的值被b线程修改了,那么a线程需要自旋,到下次循环才有可能机会执行。
Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。
2、ConcurrentSkipListMap和ConcurrentSkipSet
跳表(SkipList)是一种随机化的数据结构,通过“空间来换取时间”的一个算法,建立多级索引,实现以二分查找遍历一个有序链表。时间复杂度等同于红黑树,O(log n)。ConcurrentSkipListMap和ConcurrentSkipSet
是基于跳表实现的一种线程安全的并且可以排序的容器。插入时效率比较低,但查找时效率较高。
总结
- 在不加锁的情况下,可以用:HashMap、TreeMap、LinkedHashMap。想加锁可以用Hashtable(用的非常少)。
- 在并发量不是很高的情况下,可以用Collections.synchronizedXxx()方法,在该方法中传一个不加锁的容器(如Map),它返回一个加了锁的容器(容器中的所有方法加锁)!
- 在并发性比较高的情况下,用ConcurrentHashMap ,如果并发性高且要排序的情况下,用ConcurrentSkipListMap。
3、CopyOnWriteArrayList
CopyOnWriteArrayList在多线程环境下,写时效率低,读时效率高,适合写少读多的环境,比如事件监听器。
- 写时复制:添加元素的时候,会把这个容器复制一份,在复制的那份后面加一个新的,将引用指向复制的那份。
- 读的时候不用加锁,适合写的很少,读的特别多的时候。
public class CopyOnWriteListTest {
public static void main(String[] args) {
List<String> list =
// new ArrayList<>(); //这个会出并发问题,最后size<100000,,运行时间:0.1秒多
// new Vector<>(); //size=100000,,运行时间:0.1秒多
new CopyOnWriteArrayList<>(); //size=100000,效率很低,因为一直在“复制、写”,运行时间:5秒多
Random r = new Random();
Thread[] threads = new Thread[100];
for (int i=0; i<threads.length; i++) { //起100个线程,每个线程向容器中加1000个数(最终应该是10万个数)
Runnable task = new Runnable() {
@Override
public void run() {
for (int j=0; j<1000; j++) list.add("a" + r.nextInt());
}
};
threads[i] = new Thread(task);
}
runAndComputeTime(threads);
System.out.println(list.size());
}
static void runAndComputeTime(Thread[] threads) {
long start = System.currentTimeMillis();
Arrays.asList(threads).forEach(t->t.start());
Arrays.asList(threads).forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
二、Queue队列相关
Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue
1、ConcurrentLinkedQueue 并发队列
- 一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。
*ConcurrentLinkedQueue是非阻塞队列,其没有put和take方法,可以无限制存,且是线程安全的,N个用户同时存也能保证每次存放在队尾而不乱掉,但是其的size()方法会不定时遍历,所以特耗时
public class ConcurrentQueue {
public static void main(String[] args) {
Queue<String> strs = new ConcurrentLinkedQueue<>(); //还有双端队列...Deque
for (int i=0; i<10; i++) {
//类似于add方法,如果是ArrayQueue,add方法可能会抛异常,但是offer方法不会抛异常,返回boolean类型即是否添加成功
strs.offer("a"+i);
}
System.out.println(strs); //[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
System.out.println("队列原始大小:" + strs.size()); //队列原始大小:10
//poll方法表示从头上拿出一个删掉;peek方法表示从头上拿出一个用一下不删。
System.out.println("poll " + strs.poll() + "后的大小为:" + strs.size()); //poll a0后的大小为:9
System.out.println("peek " + strs.peek() + "后的大小为:" + strs.size()); //peek a1后的大小为:9
}
}
2、 LinkedBlockingQueue 无界阻塞式队列
public class LinkedBlockingQueueTest {
static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
static Random r = new Random();
public static void main(String[] args) {
new Thread(()->{ //1个生产者线程
for (int i=0; i<100; i++) {
try {
strs.put("a" + i); //如果满了,就会等待
TimeUnit.MILLISECONDS.sleep(r.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"producer").start();
for (int i=0; i<5; i++) { //5个消费者进程
new Thread(()-> {
for (;;) {
try {
System.out.println(Thread.currentThread().getName()
+ " take-" + strs.take()); //如果空了,就等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"customer"+i).start();
}
}
}
3、 ArrayBlockingQueue 有界阻塞式队列
/*有界阻塞式队列*/
public class ArrayBlockingQueueTest {
static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); //最多装10个
static Random r = new Random();
public static void main(String[] args) {
for (int i=0; i<10; i++) {
try {
strs.put("a" + i); //向容器中添加10个,就满了
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try { //strs已经满了,以下方法都加不进去,但是处理方式不同
strs.put("aaa");//发现满了,就会等待,程序阻塞
strs.add("aaa"); //已经满了,再往里面装就会报异常
strs.offer("aaa");//不会报异常,但是加不进去,返回是否添加成功
strs.offer("aaa",1,TimeUnit.SECONDS); //1秒钟后加不进去,就不往里面加了
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(strs);
}
}
4、 DelayQueue 执行定时任务
往DelayQueue里加的元素是按时间排好序的,该队列是无界的。另外元素要实现Delayed接口,而Delayed接口又继承了Comparable接口,所以该类元素需要实现compareTo()方法,用来对元素在队列中进行排序;并且每个元素记载着自己还有多长时间才能被拿走,还要实现getDelay()方法,只有getDelay()返回值为负数时才能被拿走。
public class DelayQueueTest {
static DelayQueue<MyTask> tasks = new DelayQueue<>();
static class MyTask implements Delayed { //实现Delayed接口
long runningTime;
String name;
MyTask(long rt,String name) {
this.runningTime = rt;
this.name = name;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else // ==
return 0;
}
@Override
public String toString() {
return name + "--" + runningTime;
}
}
public static void main(String[] args) {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask(now + 1000, "task1"); //1 s 后执行 //②
MyTask t2 = new MyTask(now + 2000, "task2"); //2 s后执行 //④
MyTask t3 = new MyTask(now + 1500, "task3"); //1.5s后执行 //③
MyTask t4 = new MyTask(now + 500, "task4"); //0.5s后执行 //①
MyTask t5 = new MyTask(now + 2500, "task5"); //2.5s后执行 //⑤
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for (int i=0; i<5; i++) {
try {
System.out.println(tasks.take()); //按放进去的顺序拿出
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
5、 TransferQueue(接口)
消费者先启动,生产者生产一个东西的时候,不扔在队列里,而是直接去找有没有消费者,有的话直接扔给消费者,若没有消费者线程,调用transfer()方法就会阻塞,调用add()、offer()、put()方法不会阻塞。
LinkedTransferQueue为TransferQueue接口的实现类
public class TransferQueueTest {
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
new Thread(()->{ //消费者先启动,可以拿走aaa
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.transfer("aaa");
// strs.put("aaaa"); //add、offer
// new Thread(()->{ //消费者在生产者后启动,拿不到aaa,程序阻塞
// try {
// System.out.println(strs.take());
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
}
}
5、 SynchronousQueue
SynchronousQueue为一种特殊的TransferQueue,实现了BlockingQueue接口,生产的任何一个东西必须直接交给消费者消费,不能搁在容器里,容器的容量为0。因此,调用add()会报错,offer()始终为false,poll()始终为null。调用put()方法,如果没有消费者,则线程阻塞,该方法内部调用了TransferQueue的transfer()方法。
public class SynchronizeQueueTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strs = new SynchronousQueue<>();
new Thread(()->{ //消费者线程
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.put("aaaa"); //不能调用add(报错),add不进去,put阻塞,等待消费者消费,内部调用的transfer.
System.out.println(strs.size()); //0
}
}