对多线程的理解
- 进程是操作系统进行资源分配的基本单位,而线程是操作系统进行调度的基本单位;
- 多线程是随着多核CPU的出行,才实现了真正意义的并行执行;
- 好处:资源利用率更高、更快、有时候设计更简单;
- 代价:设计更复杂(共享内存、线程同步)、活跃性(死锁、饥饿等)、性能问题(上下文频繁切换、占内存)。
1️⃣线性安全性
- 定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要额外的同步或协同,这个类都能表现出正确行为,那么就称这个类是线程安全的。
- 在线程安全类中封装了必要的同步机制,因此客户端无须进一步采取同步措施。
- 无状态的对象一定是线程安全的。
@ThreadSafe
public class StatelessFactorizer extends GenericServlet implements Servlet {
public void service(ServletRequest req, ServletResponse resp) {
BigInteger i = extractFromRequest(req);
BigInteger[] factors = factor(i);
encodeIntoResponse(resp, factors);
}
}
- 原子性:提供互斥访问,同一时刻只能有一个线程对其操作;
-
Atomic包
1.1 AtomicInteger
@Slf4j
@ThreadSafe
public class AtomicExample1 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count.get());
}
private static void add() {
count.incrementAndGet();
// count.getAndIncrement();
}
}
// AtomicInteger
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
// Unsafe
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
1.2 AtomicLong和LongAdder对比
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
- jvm对long,double这些64位的变量拆成两个32位的操作;
- LongAdder的设计思想:核心是将热点数据分离,将内部数据value分成一个数组,每个线程访问时,通过hash等算法映射到其中一个数字进行技术,而最终计数结果为这个数组的求和累加,其中热点数据value会被分离成多个热点单元的数据cell,每个cell独自维护内部的值,当前value的实际值由所有的cell累积合成,从而使热点进行了有效的分离,提高了并行度。LongAdder 在低并发的时候通过直接操作base,可以很好的保证和Atomic的性能基本一致,在高并发的场景,通过热点分区来提高并行度;
- 缺点:在统计的时候如果有并发更新,可能会导致结果有些误差。
1.3 AtomicReference、AtomicIntegerFieldUpdater
@Slf4j
@ThreadSafe
public class AtomicExample4 {
private static AtomicReference<Integer> count = new AtomicReference<>(0);
public static void main(String[] args) {
count.compareAndSet(0, 2); // 2
count.compareAndSet(0, 1); // no
count.compareAndSet(1, 3); // no
count.compareAndSet(2, 4); // 4
count.compareAndSet(3, 5); // no
log.info("count:{}", count.get());
}
}
@Slf4j
@ThreadSafe
public class AtomicExample5 {
private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");
@Getter
public volatile int count = 100;//字段需要volatile
public static void main(String[] args) {
AtomicExample5 example5 = new AtomicExample5();
if (updater.compareAndSet(example5, 100, 120)) {
log.info("update success 1, {}", example5.getCount());
}
if (updater.compareAndSet(example5, 100, 120)) {
log.info("update success 2, {}", example5.getCount());
} else {
log.info("update failed, {}", example5.getCount());
}
}
}
1.4 AtomicStampedReference:CAS的ABA问题
@Slf4j
@ThreadSafe
public class AtomicExample8 {
private static AtomicStampedReference<Integer> money = new AtomicStampedReference<>(19, 0);
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
final int stamp = money.getStamp();
new Thread() {
public void run() {
while (true) {
while (true) {
Integer m = money.getReference();
if (m < 20) {
System.out.println("...");
if (money.compareAndSet(m, m + 20, stamp, stamp + 1)) {
System.out.println("余额小于20元,充值成功,余额:" + money.getReference() + "元");
break;
}
} else {
System.out.println("余额大于20元,无需充值");
break;
}
}
}
}
}.start();
}
new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
while (true) {
Integer m = money.getReference();
int stamp = money.getStamp();
if (m > 10) {
if (money.compareAndSet(m, m - 20, stamp, stamp + 1)) {
System.out.println("成功消费10元,余额:" + money.getReference() + "元");
break;
}
} else {
System.out.println("没有足够的金额");
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
}
}.start();
}
}
1.5 AtomicIntegerArray
@Slf4j
@ThreadSafe
public class AtomicExample9 {
private static int a[] = {1, 2, 3};
private static AtomicIntegerArray arr = new AtomicIntegerArray(a);
public static void main(String[] args) {
arr.compareAndSet(1, 2, 5);
System.out.println(arr);//[1, 5, 3]
}
}
1.6 AtomicBoolean
@Slf4j
@ThreadSafe
public class AtomicExample6 {
private static AtomicBoolean isHappened = new AtomicBoolean(false);
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
test();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("isHappened:{}", isHappened.get());
}
private static void test() {
if (isHappened.compareAndSet(false, true)) {
log.info("execute");
}
}
}
- synchronized
- synchronized:依赖JVM (主要依赖JVM实现锁,因此在这个关键字作用对象的作用范围内,都是同一时刻只能有一个线程进行操作的);
- Lock:依赖特殊的CPU指令,代码实现,ReentrantLock.
修饰内容分类:
- 作用于调用的对象:修饰代码块或方法;
- 作用于所有的对象:修饰静态方法或修饰类括号里的部分。
@Slf4j
public class SynchronizedExample1 {
// 修饰一个代码块
public void test1(int j) {
synchronized (this) {
for (int i = 0; i < 5; i++) {
log.info("test1 {} - {}", j, i);
}
}
}
// 修饰一个方法
public synchronized void test2(int j) {
for (int i = 0; i < 5; i++) {
log.info("test2 {} - {}", j, i);
}
}
public static void main(String[] args) {
SynchronizedExample1 example1 = new SynchronizedExample1();
SynchronizedExample1 example2 = new SynchronizedExample1();
ExecutorService executorService = Executors.newCachedThreadPool();
//顺序
// executorService.execute(() -> {
// example1.test2(1);
// });
// executorService.execute(() -> {
// example1.test2(2);
// });
//乱序
// executorService.execute(() -> {
// example1.test1(1);
// });
// executorService.execute(() -> {
// example2.test1(2);
// });
//乱序
executorService.execute(() -> {
example1.test2(1);
});
executorService.execute(() -> {
example2.test2(2);
});
}
}
@Slf4j
public class SynchronizedExample2 {
// 修饰一个类
public static void test1(int j) {
synchronized (SynchronizedExample2.class) {
for (int i = 0; i < 10; i++) {
log.info("test1 {} - {}", j, i);
}
}
}
// 修饰一个静态方法
public static synchronized void test2(int j) {
for (int i = 0; i < 10; i++) {
log.info("test2 {} - {}", j, i);
}
}
public static void main(String[] args) {
SynchronizedExample2 example1 = new SynchronizedExample2();
SynchronizedExample2 example2 = new SynchronizedExample2();
ExecutorService executorService = Executors.newCachedThreadPool();
//顺序
executorService.execute(() -> {
example1.test2(1);
});
executorService.execute(() -> {
example2.test2(2);
});
//顺序
// executorService.execute(() -> {
// example1.test1(1);
// });
// executorService.execute(() -> {
// example2.test1(2);
// });
}
}
@Slf4j
@ThreadSafe
public class CountExample3 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static int count = 0;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private synchronized static void add() {
count++;
}
}
- Lock
todolist。。。
实现原子性的3种方式对比:
- synchronized不可中断锁,适合竞争不激烈,可读性好;
- Lock可中断锁,多样化同步,竞争激烈时能维持常态;
- Atomic竞争激烈时能维持常态,比Lock性能好,只能同步一个值。
- 可见性
- 导致共享变量在线程不可见的原因:
- 线程交叉执行;
- 重排序结合线程交叉执行;
- 共享变量更新后的值没有在工作内存与主内存间及时更新。
- java提供了synchronized和volatile 两种方法来确保可见性
2.1 JMM(java内存模型)关于synchronized的两条规定:
- 线程解锁前,必须把共享变量的最新值刷新到主内存;
- 线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(注意,加锁和解锁是同一把锁)
2.2 可见性-volatile:通过加入 内存屏障和禁止重排序优化来实现:
- 对volatile 变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存;
- 对volatile变量读操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量
@Slf4j
@NotThreadSafe
public class CountExample4 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static volatile int count = 0;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
/**
* 本质上应该是这个方法线程不安全
*
* volatile只能保证 1,2,3的顺序不会被重排序
* 但是不保证1,2,3的原子执行,也就是说还是有可能有两个线程交叉执行1,导致结果不一致
*/
private static void add() {
count++;
// 1、count
// 2、+1
// 3、count
}
}
- volatile使用条件
- 对变量写操作不依赖于当前值
- 该变量没有包含在具有其他变量的不必要的式子中
- volatile特别适合用来做线程标记量,如下图:
- 有序性
- Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序的过程不会影响到单线程程序的执行,却会影响多线程并发执行的正确性;
- 保证有序性的方法:volatile、synchronized、Lock
happens-before原则:
- 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作;
- 锁定规则:一个unLock操作先行发生于后面对同一个锁额lock操作;
- volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;
- 传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C;
- 线程启动规则:Thread对象的start()方法先行发生于此线程的每个一个动作;
- 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生;
- 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行;
- 对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始;
2️⃣安全发布对象
- 发布对象
- 发布对象:使一个对象能够被当前范围之外的代码所使用;
- 对象溢出:一种错误的发布。当一个对象还没构造完成时,就是它被其他对象可见。
@Slf4j
@NotThreadSafe
public class UnsafePublish {
//不安全的发布
private String[] states = {"a", "b", "c"};
/**
* 通过public发布级别发布了类的域,在类的外部,任何线程都可以访问这个域
* 这样是不安全的,因为我们无法检查其他线程是否会修改这个域导致了错误
* @return
*/
public String[] getStates() {
return states;
}
public static void main(String[] args) {
UnsafePublish unsafePublish = new UnsafePublish();
log.info("{}", Arrays.toString(unsafePublish.getStates()));
unsafePublish.getStates()[0] = "d";
log.info("{}", Arrays.toString(unsafePublish.getStates()));
}
}
@Slf4j
@NotThreadSafe
@NotRecommend
public class Escape {
private int thisCanBeEscape = 0;
public Escape () {
new InnerClass();
}
/**
* 对象溢出
* 在对象构造完成之前,不可以将其发布
* 使用工厂方法和私有构造函数来完成对象创建和监听器的注册来避免不正确的发布
*/
private class InnerClass {
public InnerClass() {
log.info("{}", Escape.this.thisCanBeEscape);
}
}
public static void main(String[] args) {
new Escape();
}
}
- 安全发布对象的4种方法
- 在静态初始化函数中,初始化一个对象引用;
- 将对象的引用保存到volatile类型域或者AtomicReference对象中;
- 将对象的引用保存到某个正确构造对象的final类型域中;
- 将对象的引用保存到一个由锁保护的域中。
/**
* 懒汉模式
* 单例实例在第一次使用时进行创建
*/
@NotThreadSafe
public class SingletonExample1 {
// 私有构造函数
private SingletonExample1() {
}
// 单例对象
private static SingletonExample1 instance = null;
// 静态的工厂方法
public static SingletonExample1 getInstance() {
if (instance == null) {
instance = new SingletonExample1();
}
return instance;
}
}
/**
* 饿汉模式
* 单例实例在类装载时进行创建
* 缺点 1.如果创建过程中进行很多的运算,会导致类加载的时候特别的慢
* 2.如果创建出来的实例要很久以后才被调用,那么会导致资源的浪费
*/
@ThreadSafe
public class SingletonExample2 {
// 私有构造函数
private SingletonExample2() {
}
// 单例对象
private static SingletonExample2 instance = new SingletonExample2();
// 静态的工厂方法
public static SingletonExample2 getInstance() {
return instance;
}
}
/**
* 懒汉模式
* 单例实例在第一次使用时进行创建
*/
@ThreadSafe
@NotRecommend
public class SingletonExample3 {
// 私有构造函数
private SingletonExample3() {
}
// 单例对象
private static SingletonExample3 instance = null;
// 静态的工厂方法
public static synchronized SingletonExample3 getInstance() {
if (instance == null) {
instance = new SingletonExample3();
}
return instance;
}
}
/**
* 懒汉模式 -》 双重同步锁单例模式
* 单例实例在第一次使用时进行创建
*/
@NotThreadSafe
public class SingletonExample4 {
// 私有构造函数
private SingletonExample4() {
}
// 1、memory = allocate() 分配对象的内存空间
// 2、ctorInstance() 初始化对象
// 3、instance = memory 设置instance指向刚分配的内存
// JVM和cpu优化,发生了指令重排
// 1、memory = allocate() 分配对象的内存空间
// 3、instance = memory 设置instance指向刚分配的内存
// 2、ctorInstance() 初始化对象
// 单例对象
private static SingletonExample4 instance = null;
// 静态的工厂方法
public static SingletonExample4 getInstance() {
if (instance == null) { // 双重检测机制 // B
synchronized (SingletonExample4.class) { // 同步锁
if (instance == null) {
instance = new SingletonExample4(); // A - 3
}
}
}
return instance;
}
}
/**
* 懒汉模式 -》 双重同步锁单例模式
* 单例实例在第一次使用时进行创建
*/
@ThreadSafe
public class SingletonExample5 {
// 私有构造函数
private SingletonExample5() {
}
// 1、memory = allocate() 分配对象的内存空间
// 2、ctorInstance() 初始化对象
// 3、instance = memory 设置instance指向刚分配的内存
// 单例对象 volatile + 双重检测机制 -> 禁止指令重排
private volatile static SingletonExample5 instance = null;
// 静态的工厂方法
public static SingletonExample5 getInstance() {
if (instance == null) { // 双重检测机制 // B
synchronized (SingletonExample5.class) { // 同步锁
if (instance == null) {
instance = new SingletonExample5(); // A - 3
}
}
}
return instance;
}
}
/**
* 饿汉模式
* 单例实例在类装载时进行创建
*/
@ThreadSafe
public class SingletonExample6 {
// 私有构造函数
private SingletonExample6() {
}
// 单例对象
private static SingletonExample6 instance = null;
static {
instance = new SingletonExample6();
}
// 静态的工厂方法
public static SingletonExample6 getInstance() {
return instance;
}
public static void main(String[] args) {
System.out.println(getInstance().hashCode());
System.out.println(getInstance().hashCode());
}
}
/**
* 枚举模式:最安全
*/
@ThreadSafe
@Recommend
public class SingletonExample7 {
// 私有构造函数
private SingletonExample7() {
}
public static SingletonExample7 getInstance() {
return Singleton.INSTANCE.getInstance();
}
private enum Singleton {
INSTANCE;
private SingletonExample7 singleton;
// JVM保证这个方法绝对只调用一次
Singleton() {
singleton = new SingletonExample7();
}
public SingletonExample7 getInstance() {
return singleton;
}
}
}
3️⃣ 线程安全的策略
- 不可变的对象
不可变对象需要满足的条件
- 对象创建以后其状态就不能修改;
- 对象所有域都是final类型;
- 对象是正确创建的(在对象创建期间,this引用没有逸出)。
创建不可变对象的方式(参考String类型)
- 将类声明成final类型,使其不可以被继承;
- 将所有的成员设置成私有的,使其他的类和对象不能直接访问这些成员;
- 对变量不提供set方法;
- 将所有可变的成员声明为final,这样只能对他们赋值一次;
- 通过构造器初始化所有成员,进行深度拷贝;
- 在get方法中,不直接返回对象本身,而是克隆对象,返回对象的拷贝。
关于深拷贝和浅拷贝:https://segmentfault.com/a/1190000010648514
final关键字:类、方法、变量
- 修饰类:不能被继承(final类中的所有方法都会被隐式的声明为final方法);
- 修饰方法:1、锁定方法不被继承类修改;2、提升效率(private方法被隐式修饰为final方法);
- 修饰变量:基本数据类型变量(初始化之后不能修改)、引用类型变量(初始化之后不能再修改其引用)。
@Slf4j
@NotThreadSafe
public class ImmutableExample1 {
private final static Integer a = 1;
private final static String b = "2";
private final static Map<Integer, Integer> map = Maps.newHashMap();
static {
map.put(1, 2);
map.put(3, 4);
map.put(5, 6);
}
public static void main(String[] args) {
// a = 2;
// b = "3";
// map = Maps.newHashMap();
map.put(1, 3);
log.info("{}", map.get(1));
}
private void test(final int a) {
// a = 1;
}
}
@Slf4j
@ThreadSafe
public class ImmutableExample2 {
private static Map<Integer, Integer> map = Maps.newHashMap();
static {
map.put(1, 2);
map.put(3, 4);
map.put(5, 6);
map = Collections.unmodifiableMap(map);
}
public static void main(String[] args) {
map.put(1, 3);//java.lang.UnsupportedOperationException
log.info("{}", map.get(1));
}
}
@ThreadSafe
public class ImmutableExample3 {
private final static ImmutableList<Integer> list = ImmutableList.of(1, 2, 3);
private final static ImmutableSet set = ImmutableSet.copyOf(list);
private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1, 2, 3, 4);
private final static ImmutableMap<Integer, Integer> map2 = ImmutableMap.<Integer, Integer>builder()
.put(1, 2).put(3, 4).put(5, 6).build();
public static void main(String[] args) {
System.out.println(map2.get(3));
}
}
- 线程封闭:把对象封装到一个线程里,只有这个线程能看到这个对象
- Ad-hoc 线程封闭:程序控制实现,最糟糕,忽略
- 堆栈封闭:局部变量,无并发问题
- ThreadLocal 线程封闭:特别好的封闭方法
例子:ThreadLocal 实例保存登录用户信息。
public class RequestHolder {
private final static ThreadLocal<Long> requestHolder = new ThreadLocal<>();
public static void add(Long id) {
requestHolder.set(id);
}
public static Long getId() {
return requestHolder.get();
}
public static void remove() {
requestHolder.remove();
}
}
@SpringBootApplication
public class ConcurrencyApplication extends WebMvcConfigurerAdapter{
public static void main(String[] args) {
SpringApplication.run(ConcurrencyApplication.class, args);
}
@Bean
public FilterRegistrationBean httpFilter() {
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(new HttpFilter());
registrationBean.addUrlPatterns("/threadLocal/*");
return registrationBean;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new HttpInterceptor()).addPathPatterns("/**");
}
}
@Slf4j
public class HttpFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
log.info("do filter, {}, {}", Thread.currentThread().getId(), request.getServletPath());
RequestHolder.add(Thread.currentThread().getId());
filterChain.doFilter(servletRequest, servletResponse);
}
@Override
public void destroy() {
}
}
@Slf4j
public class HttpInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
log.info("preHandle");
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
RequestHolder.remove();
log.info("afterCompletion");
return;
}
}
@Controller
@RequestMapping("/threadLocal")
public class ThreadLocalController {
@RequestMapping("/test")
@ResponseBody
public Long test() {
return RequestHolder.getId();
}
}
- 线程不安全类与写法
3.1 StringBuilder 线程不安全,StringBuffer线程安全
@Slf4j
@NotThreadSafe
public class StringExample1 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static StringBuilder stringBuilder = new StringBuilder();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", stringBuilder.length());
}
private static void update() {
stringBuilder.append("1");
}
}
StringBuffer几乎所有的方法都加了synchronized关键字,以保证线程安全
@Slf4j
@ThreadSafe
public class StringExample2 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static StringBuffer stringBuffer = new StringBuffer();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", stringBuffer.length());
}
private static void update() {
stringBuffer.append("1");
}
}
public synchronized StringBuffer append(String str) {
toStringCache = null;
super.append(str);
return this;
}
3.2 SimpleDateFormat->joda.DateTimeFormatter
- SimpleDateFormat 在多线程共享使用的时候回抛出转换异常,应该才用堆栈封闭在每次调用方法的时候在方法里创建一个SimpleDateFormat;
- 另一种方式是使用joda-time的DateTimeFormatter(推荐使用)
@Slf4j
@NotThreadSafe
public class DateFormatExample1 {
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update() {
try {
//java.lang.NumberFormatException: For input string: ""
simpleDateFormat.parse("20180208");
} catch (Exception e) {
log.error("parse exception", e);
}
}
}
@Slf4j
@ThreadSafe
public class DateFormatExample2 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update() {
try {
//用堆栈封闭在每次调用方法的时候在方法里创建一个SimpleDateFormat;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
simpleDateFormat.parse("20180208");
} catch (Exception e) {
log.error("parse exception", e);
}
}
}
@Slf4j
@ThreadSafe
public class DateFormatExample3 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd");
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update(int i) {
log.info("{}, {}", i, DateTime.parse("20180208", dateTimeFormatter).toDate());
}
}
3.3 ArrayList,HashMap,HashSet等Collections
3.4 先检查再执行
// 非原子性
if(condition(a)){
handle(a);
}
- Java同步容器
4.1 ArrayList->Vector
- vector的所有方法都是有synchronized关键字保护的;
- stack继承了vector,并且提供了栈操作(先进后出).
@Slf4j
@ThreadSafe
public class VectorExample1 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static List<Integer> list = new Vector<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
}
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
4.2 HashMap -> HashTable(由synchronized关键字保护)
@Slf4j
@ThreadSafe
public class HashTableExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new Hashtable<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
public synchronized V put(K key, V value) {
// Make sure the value is not null
if (value == null) {
throw new NullPointerException();
}
// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> entry = (Entry<K,V>)tab[index];
for(; entry != null ; entry = entry.next) {
if ((entry.hash == hash) && entry.key.equals(key)) {
V old = entry.value;
entry.value = value;
return old;
}
}
addEntry(hash, key, value, index);
return null;
}
4.3 Collections.synchronizedXXX (list,set,map)
@Slf4j
@ThreadSafe
public class CollectionsExample1 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
}
@Slf4j
@ThreadSafe
public class CollectionsExample2 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet());
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", set.size());
}
private static void update(int i) {
set.add(i);
}
}
@Slf4j
@ThreadSafe
public class CollectionsExample3 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
4.4 同步容器不一定线性安全
@NotThreadSafe
public class VectorExample2 {
private static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) {
while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread() {
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
};
Thread thread2 = new Thread() {
public void run() {
for (int i = 0; i < vector.size(); i++) {
//java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 16
vector.get(i);
}
}
};
thread1.start();
thread2.start();
}
}
}
public class VectorExample3 {
//注意:2.在foreach或迭代器遍历的过程中不要做删除操作,应该先标记,然后最后再统一删除
// java.util.ConcurrentModificationException
private static void test1(Vector<Integer> v1) { // foreach
for(Integer i : v1) {
if (i.equals(3)) {
v1.remove(i);
}
}
}
// java.util.ConcurrentModificationException
private static void test2(Vector<Integer> v1) { // iterator
Iterator<Integer> iterator = v1.iterator();
while (iterator.hasNext()) {
Integer i = iterator.next();
if (i.equals(3)) {
v1.remove(i);
}
}
}
// success
private static void test3(Vector<Integer> v1) { // for
for (int i = 0; i < v1.size(); i++) {
if (v1.get(i).equals(3)) {
v1.remove(i);
}
}
}
public static void main(String[] args) {
Vector<Integer> vector = new Vector<>();
vector.add(1);
vector.add(2);
vector.add(3);
test3(vector);
}
}
- Java并发容器J.U.C
5.1 ArrayList -> CopyOnWriteArrayList
https://blog.csdn.net/mazhimazh/article/details/19210547
@Slf4j
@ThreadSafe
public class CopyOnWriteArrayListExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
//适合读多写少的场景;不太适合实时读的场景
private static List<Integer> list = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
}
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
5.2 HashSet -> CopyOnWriteArraySet、TreeSet -> ConcurrentSkipListSet
HashMap -> ConcurrentHashMap、TreeMap -> ConcurrentSkipListMap
@Slf4j
@ThreadSafe
public class CopyOnWriteArraySetExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Set<Integer> set = new CopyOnWriteArraySet<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", set.size());
}
private static void update(int i) {
set.add(i);
}
}
安全共享对象策略总结:
- 线程限制:一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改;
- 共享只读:一个共享只读对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它;
- 线程安全对象:一个线程安全的对象或容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问它;
- 被守护对象:只能通过获取特定的锁来访问。
4️⃣ J.U.C之AQS
4.1 CountDownLatch:同步阻塞类,完成阻塞线程的功能
- 应用1:父任务等待所有子任务都完成的时候,在继续往下进行;
- 应用2:有多个线程完成一个任务,但是这个任务只想给他一个指定的时间,超过这个任务就不继续等待了。
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
@Slf4j
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
}
}
4.2 Semaphore
使用场景:仅能提供有限访问的资源:比如数据库的连接数最大只有20,而上层的并发数远远大于20,这时候如果不做限制,可能会由于无法获取连接而导致并发异常,这时候可以使用Semaphore来进行控制,当信号量设置为1的时候,就和单线程很相似了。
@Slf4j
public class SemaphoreExample1 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
@Slf4j
public class SemaphoreExample2 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(3); // 获取多个许可
test(threadNum);
semaphore.release(3); // 释放多个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
@Slf4j
public class SemaphoreExample3 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire()) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
@Slf4j
public class SemaphoreExample4 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
4.3 CyclicBarrier
原理:同步辅助类,允许一组线程相互等待,知道所有线程都准备就绪后,才能继续操作,当某个线程调用了await方法之后,就会进入等待状态,并将计数器-1,直到所有线程调用await方法使计数器为0,才可以继续执行,由于计数器可以重复使用,所以我们又叫他循环屏障。
- 使用场景1:多线程计算数据,最后合并计算结果的应用场景,比如用Excel保存了用户的银行流水,每一页保存了一个用户近一年的每一笔银行流水,现在需要统计用户的日均银行流水,这时候我们就可以用多线程处理每一页里的银行流水,都执行完以后,得到每一个页的日均银行流水,之后通过CyclicBarrier的action,利用这些线程的计算结果,计算出整个excel的日均流水;
- CyclicBarrier与CountDownLatch区别:
CyclicBarrier可以重复使用(使用reset方法),CountDownLatch只能用一次;
CountDownLatch主要用于实现一个或n个线程需要等待其他线程完成某项操作之后,才能继续往下执行,描述的是一个或n个线程等待其他线程的关系,而CyclicBarrier是多个线程相互等待,知道满足条件以后再一起往下执行。描述的是多个线程相互等待的场景。
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
@Slf4j
public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
@Slf4j
public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
4.3 ReentrantLock
ReentrantLock实现原理:自旋锁,循环调用CAS操作来实现加锁,避免了使线程进入内核态的阻塞状态。想办法阻止线程进入内核态的阻塞状态,是我们分析和理解锁的关键钥匙。
对比维度 | synchronized | ReentrantLock |
---|---|---|
可重入性(进入锁的时候计数器自增1) | 可重入 | 可重入 |
锁的实现 | JVM实现,很难操作源码 | JDK实现 |
性能 | 在引入轻量级锁后性能大大提升,建议都可以选择的时候选择synchornized | |
功能区别 | 方便简洁,由编译器负责加锁和释放锁 | 手工操作 |
粗粒度,不灵活 | 细粒度,可灵活控制 | |
可否指定公平所 | 不可以 | 可以 |
可否放弃锁 | 不可以 | 可以 |
ReentrantLock独有的功能:
- 可指定是公平锁还是非公平锁;
- 提供了一个Condition类,可以分组唤醒需要唤醒的线程;
- 提供能够中断等待锁的行程的机制,lock.lockInterruptibly();
@Slf4j
@ThreadSafe
public class LockExample2 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static int count = 0;
private final static Lock lock = new ReentrantLock();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}
@Slf4j
public class LockExample6 {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
new Thread(() -> {
try {
reentrantLock.lock();
log.info("wait signal"); // 1
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4
reentrantLock.unlock();
}).start();
new Thread(() -> {
reentrantLock.lock();
log.info("get lock"); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();
log.info("send signal ~ "); // 3
reentrantLock.unlock();
}).start();
}
}
4.4 ReentrantReadWriteLock
在没有任何读写锁的时候才能取得写入的锁,可用于实现悲观读取,读多写少的场景下可能会出现线程饥饿。
@Slf4j
public class LockExample3 {
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
class Data {
}
}
4.5 StampedLock
StampedLock控制锁有三种模式(写,读,乐观读),一个StampedLock状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问,数字0表示没有写锁被授权访问。在读锁上分为悲观锁和乐观锁。
所谓的乐观读模式,也就是若读的操作很多,写的操作很少的情况下,你可以乐观地认为,写入与读取同时发生几率很少,因此不悲观地使用完全的读取锁定,程序可以查看读取资料之后,是否遭到写入执行的变更,再采取后续的措施(重新读取变更信息,或者抛出异常) ,这一个小小改进,可大幅度提高程序的吞吐量!!
public class LockExample4 {
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
//下面看看乐观读锁案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
double currentX = x, currentY = y; //将两个字段读入本地局部变量
if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
try {
currentX = x; // 将两个字段读入本地局部变量
currentY = y; // 将两个字段读入本地局部变量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
//下面是悲观读锁案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
} else { //如果不能成功转换为写锁
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁
}
}
}
}
@Slf4j
@ThreadSafe
public class LockExample5 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static int count = 0;
private final static StampedLock lock = new StampedLock();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}
}
总结:
- synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控
- synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定;
- ReentrantLock、ReentrantReadWriteLock,、StampedLock都是对象层面的锁定,要保证锁定一定会被释放,就必须将unLock()放到finally{}中;
- StampedLock 对吞吐量有巨大的改进,特别是在读线程越来越多的场景下;
- StampedLock有一个复杂的API,对于加锁操作,很容易误用其他方法;
- 当只有少量竞争者的时候,synchronized是一个很好的通用的锁实现;
- 当线程增长能够预估,ReentrantLock是一个很好的通用的锁实现;
- https://my.oschina.net/benhaile/blog/264383
5️⃣ J.U.C组件拓展
5.1 Callable、Future、Future
@Slf4j
public class FutureExample {
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
}
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();
log.info("result:{}", result);
}
}
@Slf4j
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});
new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result:{}", result);
}
}
https://www.cnblogs.com/dolphin0520/p/3949310.html
5.2 ForkJoinPool
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
具体原理参考:http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/#%E5%8E%9F%E7%90%86
5.3 BlockingQueue
6️⃣ 线程池
6.1 new Thread的弊端
- 每次new Thread新建对象,性能差;
- 线程缺乏统一管理,可能无限制地创建线程,相互竞争,有可能占用过多系统资源导致死机或OOM;
- 缺少更多功能,如更多执行、定期执行、线程中断。
6.2 线程池的好处
- 重用存在的线程,减少对象创建、消亡的开销,性能佳;
- 可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞;
- 提供定期、定时执行、单线程、并发数控制等。
6.3 ThreadPoolExecutor线程池参数
- corePoolSize:核心线程数量;
- maximumPoolSize:线程最大线程数;
- workQueue:阻塞队列,存储等待执行的任务,会对线程池运行过程产生重大影响;
- keepAliveTime:线程没有任务执行时,最多保持多久时间终止;
- unit:keepAliveTime的时间单位;
- threadFactory:线程工程,用来创建线程;
- handler:当拒绝处理任务时的策略。
6.4 ThreadPoolExecutor常用方法
- execute():提交任务,交给线程池执行;
- submit(): 提交任务,能返回执行结果,execute+Future;
- shutdown():关闭线程池,等待任务都执行完;
- shutdownNow():关闭线程池,不等待任务执行完;
- getTaskCount():线程池已执行和未执行的线程数量;
- getCompletedTaskCount():已完成的任务数量;
- getPoolSize():线程池当前的线程数量;
-
getActiveCount():当前线程池中,正在执行任务的线程数量。
6.4 线程池配置
- CPU密集型:NCPU + 1;
- IO密集型:2 * NCPU。
@Slf4j
public class ThreadPoolExample1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}", index);
}
});
}
executorService.shutdown();
}
}
@Slf4j
public class ThreadPoolExample2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}", index);
}
});
}
executorService.shutdown();
}
}
@Slf4j
public class ThreadPoolExample3 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}", index);
}
});
}
executorService.shutdown();
}
}
@Slf4j
public class ThreadPoolExample4 {
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// executorService.schedule(new Runnable() {
// @Override
// public void run() {
// log.warn("schedule run");
// }
// }, 3, TimeUnit.SECONDS);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
log.warn("schedule run");
}
}, 1, 3, TimeUnit.SECONDS);
// executorService.shutdown();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
log.warn("timer run");
}
}, new Date(), 5 * 1000);
}
}
7️⃣ 多线程并发拓展
死锁必要条件:
- 互斥条件;
- 请求和保持条件;
- 不剥夺条件;
- 环路等待条件;
/**
* 一个简单的死锁类
* 当DeadLock类的对象flag==1时(td1),先锁定o1,睡眠500毫秒
* 而td1在睡眠的时候另一个flag==0的对象(td2)线程启动,先锁定o2,睡眠500毫秒
* td1睡眠结束后需要锁定o2才能继续执行,而此时o2已被td2锁定;
* td2睡眠结束后需要锁定o1才能继续执行,而此时o1已被td1锁定;
* td1、td2相互等待,都需要得到对方锁定的资源才能继续执行,从而死锁。
*/
@Slf4j
public class DeadLock implements Runnable {
public int flag = 1;
//静态对象是类的所有对象共享的
private static Object o1 = new Object(), o2 = new Object();
@Override
public void run() {
log.info("flag:{}", flag);
if (flag == 1) {
synchronized (o1) {
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (o2) {
log.info("1");
}
}
}
if (flag == 0) {
synchronized (o2) {
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (o1) {
log.info("0");
}
}
}
}
public static void main(String[] args) {
DeadLock td1 = new DeadLock();
DeadLock td2 = new DeadLock();
td1.flag = 1;
td2.flag = 0;
//td1,td2都处于可执行状态,但JVM线程调度先执行哪个线程是不确定的。
//td2的run()可能在td1的run()之前运行
new Thread(td1).start();
new Thread(td2).start();
}
}
多线程并发最佳实践:
- 使用本地变量;
- 使用不可变类;
- 最小化锁的作用范围:S = 1/(1-a+a/n);
- 使用线程池的Executor,而不是直接new Thread()执行;
- 宁可使用同步,也不要使用线程的wait和notify;
- 使用BlockingQueue实现生产-消费模式;
- 使用并发集合而不是加了锁的同步集合;
- 使用Semaphore创建有界的访问;
- 宁可使用同步代码块,也不使用同步的方法;
- 避免使用同步变量。
Spring与线程安全:
- Spring bean:singleton、prototype;
- 无状态对象;
HashMap与ConcurrentHashMap: