三、线程安全策略
1、不可变对象
有一种对象发布了就是安全的,它就是不可变对象
在某些情况下,将不会修改的对象,设计成不可变对象,来让对象在多个线程间,是线程安全的。
1.1、创建不可变对象可以采用的方式
1、将类声明为final,使其不可继承
2、将所有成员声明为私有的,这样就不允许直接访问成员
3、对变量不提供setter方法
4、将所有可变的成员声明为final,这样只能对它们赋值一次
5、通过构造器初始化所有成员,进行深度拷贝,在getter方法中,不直接返回对象的本身,而是克隆对象,返回对象的拷贝
1.2、final
1、String类就是final修饰的类,在用final修饰类时要谨慎,除非这个类以后真的不会用来被继承或出于安全考虑,不然不要将类用fina修饰。final修饰类时,类中所有成员方法,都会隐式的被指定为final方法
2、在早期的java版本中,会将final修饰的方法转为内嵌调用
3、基本数据类型变量final修饰,数值初始化后便不可修改。引用类型的变量,初始化后,不能再指向另外一个对象private final static Integer a = 1; private final static String b = "2"; private final static Map<Integer, Integer> map = Maps.newHashMap(); static { map.put(1, 2); map.put(3, 4); map.put(5, 6); } public static void main(String[] args) { // a = 2; // b = "3"; // map = Maps.newHashMap(); map.put(1, 3); } private void test(final int a) { // a = 1; }
1.3、java除final可以定义不可变对象,还有其它可以定义不可变对象的方式呢
private static Map<Integer, Integer> map = Maps.newHashMap();
static {
map.put(1, 2);
map.put(3, 4);
map.put(5, 6);
map = Collections.unmodifiableMap(map);
}
public static void main(String[] args) {
map.put(1, 3); //执行会抛出异常
}
private final static ImmutableList<Integer> list = ImmutableList.of(1, 2, 3);
private final static ImmutableSet set = ImmutableSet.copyOf(list);
private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1, 2, 3, 4);
private final static ImmutableMap<Integer, Integer> map2 = ImmutableMap.<Integer, Integer>builder()
.put(1, 2).put(3, 4).put(5, 6).build();
public static void main(String[] args) {
// list.add(4); 抛出异常 UnsupportedOperationException
// set.add(4); 抛出异常
// map.put(5, 6);
}
2、线程封闭
躲避并发,除了不可变对象,还有线程封闭
线程封闭就是把对象封装到一个线程里,只有这一个线程能看到这个对象,这个对象就算是线程不安全的,也不会出现安全问题了,因为它只能在一个线程中进行访问
2.1、如何实现线程封闭
堆栈封闭,就是我们平时所用的局部变量,多个线程访问一个方法的时候,方法中的局部变量会被每个线程拷贝一份到线程栈中,所以局部变量是不会被多个线程所共享的,所以不会出现java并发问题。 所以能用局部变量就不用全局变量,全局变量(不是全局常量)容易引起并发问题
ThreadLocal 内部维护了一个map,map的key是每个线程的名称,map的值就是我们要封闭的对象。也就是说ThreadLocal利用map实现了对象的线程封闭
public class RequestHolder {
private final static ThreadLocal<Long> requestHolder = new ThreadLocal<>();
public static void add(Long id) {
requestHolder.set(id);
}
public static Long getId() {
return requestHolder.get();
}
public static void remove() {
requestHolder.remove();
}
}
public class HttpFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
log.info("do filter, {}, {}", Thread.currentThread().getId(), request.getServletPath());
RequestHolder.add(Thread.currentThread().getId());
filterChain.doFilter(servletRequest, servletResponse);
}
@Override
public void destroy() {
}
}
public class HttpInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
log.info("preHandle");
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
RequestHolder.remove();
log.info("afterCompletion");
return;
}
}
2.2、线程封闭的应用
jdbc的Connection对象,Connection的实现并没有针对线程安全做太多的处理,jdbc的规范也没有要求Connection对象是线程安全的。实际中,线程从连接池获取了一个Connection对象,使用完后,再返回给连接池,由于大多数请求都是采用单线程同步的方式处理的;并且在Connection对象再返回连接池之前,系统不会把它分配给其它线程。所以这种处理方式,隐含的将Connection对象封闭在线程里面。
3、线程不安全类与写法
3.1、StringBuilder
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static StringBuilder stringBuilder = new StringBuilder();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", stringBuilder.length());
}
private static void update() {
stringBuilder.append("1");
}
StringBuffer 的append方法用了synchronized关键字做了同步处理
image.png
3.2、SimpleDateFormat
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update() {
try {
simpleDateFormat.parse("20180208"); //并发执行,结果抛出很多异常(多线程共享,容易发生异常)
} catch (Exception e) {
log.error("parse exception", e);
}
}
如何并发使用SimpleDateFormat
- 采用堆栈封闭
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update() {
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
simpleDateFormat.parse("20180208");
} catch (Exception e) {
log.error("parse exception", e);
}
}
- 使用org.joda.time.format.DateTimeFormat;
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd");
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update(int i) {
log.info("{}, {}", i, DateTime.parse("20180208", dateTimeFormatter).toDate());
}
3.3、HashMap
3.4、HashSet
3.5、ArrayList
3.6、不安全写法
//虽然if条件是原子操作,但条件和do something不是一个整体的原子操作,需要整体加锁
if(atomicInteger===1){
do something
}
4、同步容器
Vector和Stack、HashTable都是synchronized修饰
import java.util.Collections:
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
5、并发容器
- CopyOnWriteArrayList
CopyOnWriteArrayList:相比ArrayList 它是线程安全的,当有新元素添加到CopyOnWriteArrayList时,它先从原有的数组拷贝一份出来,在新的数组上进行写操作,写完之后再将原来的数组指向新的数组,CopyOnWriteArrayList它整个add()操作都是在锁的保护下进行的,这样是为了多线程并发时,拷贝出多个副本出来,把数据搞乱,导致最终的数组数据不是我们期望的
- 缺点:
1、写操作会拷贝数据,消耗内存。元素内容比较多的情况下会导致full gc
2、不能用于实时读的场景,拷贝数组、添加元素需要时间;虽然CopyOnWriteArrayList能做到最终一致性,但牺牲了实时一致性。所以CopyOnWriteArrayList适合读多,写少的场景- 优点:
1、CopyOnWriteArrayList读写分离,读是在原数组上读,写是另外开辟内存空间操作拷贝数组,保证最终一致性
2、一般情况下,数组的元素和长度都是不会非常大的,所以可以用CopyOnWriteArrayList替代ArrayList进行并发操作
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static List<Integer> list = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
- CopyOnWriteArraySet
原理类似于CopyOnWriteArrayList,只有add(),remove()是安全的,其它removeAll、addAll等是调用原来Set的方法,使用时,需要加锁同步。
- ConcurrentHashMap
是HashMap的安全版本, ConcurrentHashMap不允许空值,ConcurrentHashMap针对读操作做了大量优化,因此这个类具有特别高的并发性。
- ConcurrentSkipListMap
与HashMap相比,TreeMap是一个能比较元素大小的Map集合,会对传入的key进行了大小排序。其中,可以使用元素的自然顺序,也可以使用集合中自定义的比较器来进行排序;不同于HashMap的哈希映射,TreeMap实现了红黑树的结构,形成了一颗二叉树。
TreeMap具有如下特点:
- 不允许出现重复的key;
- 可以插入null键,null值;
- 可以对元素进行排序;
- 无序集合(插入和遍历顺序不一致);
ConcurrentSkipListMap是TreeMap的安全版本,是使用SkipList这种跳跃结构实现的。
有人拿ConcurrentHashMap和ConcurrentSkipListMap做了比较,在4线程,1.6W数据的条件下,ConcurrentHashMap 的存取速度是ConcurrentSkipListMap的4倍。
但是ConcurrentSkipListMap有几个ConcurrentHashMap不能比拟的优点:
- key是有序的
- ConcurrentSkipListMap支持更高的并发,存取时间几乎和线程数是没有关系的,在数据量一定的条件下,线程数越多,ConcurrentSkipListMap越能体现出它的优势
在单线程环境下,尽量使用
TreeMap
,少量线程可以用Collections.synchronizedSortMap(TreeMap)
,高并发场景下使用ConcurrentSkipListMap
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", map.size());
}
private static void update(int i) {
map.put(i, i);
}