1、CountDownLatch 倒计时器
package com.multithreadtool;
import lombok.SneakyThrows;
import java.util.concurrent.CountDownLatch;
/**
* 1.countdownlatch实现join的功能
* 2.等待多线程完成再一起往下执行
* notice:countdownlatch不可以重新初始化或者修改对象的内部计数器的值
*/
public class CountDownLatchTest01 {
static CountDownLatch cdl = new CountDownLatch(2);
@SneakyThrows
public static void main(String[] args) {
new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
cdl.countDown();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread1 over....");
}).start();
new Thread(() -> {
System.out.println(2);
cdl.countDown();
}).start();
//notice:调用await方法时不会阻塞当前线程,只要计数器的值数完了,那么主线程就会继续往下执行,而不会管其他的线程是否执行完成,如上22行
cdl.await();
System.out.println(3);
}
}
2、CyclicBarrier 循环栅栏
package com.multithreadtool;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 循环栅栏:让一组线程达到一个屏障(同步点)时被阻塞,直到最后一个线程达到屏障时,屏障蔡会开门,
* 所有被屏障拦截的线程蔡会继续运行
* 应用场景:可以用于多线程计算数据,最后合并计算结果的场景。
*/
public class CyclicBarrierTest01 {
static CyclicBarrier cb = new CyclicBarrier(2);
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
new Thread(() -> {
try {
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
//cb.await()表示当前的线程已经达到了目标位置,再此等待其他线程
System.out.println(1);
}).start();
cb.await();
System.out.println(2);
}
/**
* countdownlatch 与 cyclicbarrier 的区别
* 1.countdownlatch计数器中只能使用一次,cyclicbarrier可以调用reset()重置(在发生异常的时侯重试)
* 2.cyclicbarrier还提供getNumberWaiting() 获取阻塞线程数量 isBroken()了解阻塞的线程是否被中断
*/
}
3、Semaphore 信号量
package com.multithreadtool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* 信号量:用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源
* 示例场景:马路上始终保证只有100辆车,剩下的只能等待,只有待马路上的车离开了,才会放进相应数量的车
* 需求:读取数万个文件的数据,启动30个线程读取,然后保存到数据库中,数据库的连接数为10
* 这个时侯必须控制只有10个线程同时获取数据库连接保存数据,否则会报无法获取数据库连接错误。
*/
public class SemaphoreTest01 {
public static final int THREAD_COUNT = 30;
public static ExecutorService threadPoool = Executors.newFixedThreadPool(THREAD_COUNT);
//创建一个信号量
private static Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
threadPoool.execute(() -> {
try {
//1.首先使用acquire()方法获取一个许可证
System.out.println("信号量的许可证数量:" + semaphore.availablePermits());
Thread.sleep(2000);
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " save data ...");
//2.用完了许可证进行归还
semaphore.release();
});
}
threadPoool.shutdown();
}
}
4、Exchanger
package com.multithreadtool;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 线程间交换数据的Exchanger:是一个用于线程间协作的工具类,用于进行线程间的数据交换
* 它提供一个同步点,在这个同步点,两个线程可以表换彼此的数据。在两个线程通过exchange方法交换数据
* 如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都达到
* 同步点时,两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
* <p>
* 使用场景:1、遗传算法 2、校对工作
*/
public class ExchangerTest01 {
public static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(() -> {
System.out.println("A thread 开始执行");
String A = "银行流水A";
try {
Thread.sleep(5000);
//exchange()中传入的是本线程给出去的值,返回值是其他线程的值
//notice:如果两个线程有一个没有执行exchange(),则会一直等待,为了避免可以使用带超时的重载方法
String B = exgr.exchange(A);
System.out.println("A线程中发现B录入的是B:" + B + " 是否一致:" + A.equals(B));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPool.execute(() -> {
System.out.println("B thread 开始执行");
String B = "银行流水B";
try {
String A = exgr.exchange(B);
System.out.println("B线程中发现A录入的是A:" + A + " 是否一致:" + A.equals(B));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。