为什么要使用多线程
既然所有的多线程代码都可以用单线程来实现,为何还要多线程?
提高CPU的利用率,提升程序处理能力
计算机多核多线程时代的来临,意味着,同一时刻,计算机可以干两件事!
有一些业务场景的实现,用多线程实现更容易,并且更贴切实际
同一个进程中:生产者、消费者
两个人在打游戏,同时扔出一个大招
不要让后来的任务等待太久,大家平均点(用户请求)
我们一直在用的多线程
用户同时访问网站,我们Java服务用servlet进行处理。对于每个用户请求,都是用一个独立的线程来处理的。
线程创建的方式
继承Thread
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("I'm thread code");
}
}
// 测试
public static void main(String[] args) {
new MyThread().start();
}
实现Runnable
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("I'm runnable code");
}
}
// 测试
public static void main(String[] args) {
new Thread(new MyRunnable()).start();
}
实现Callable<T>
有返回值的线程任务
结合线程池来使用
public class MyCallable implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
System.out.println("this is call .... ");
// 线程睡眠5秒
TimeUnit.SECONDS.sleep(5);
return true;
}
}
// 测试
public static void main(String[] args) {
Callable<Boolean> myCallable = new MyCallable();
// 需要用线程池来提交Callable任务
Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(myCallable);
try {
Boolean aBoolean = submit.get();//5秒后得到结果,在这之前主线程将在这阻塞
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
另一种方式,可以结合Thread来使用
public class MyCallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
// 计算1...100的和
for (int i = 1; i <= 100; i++) {
sum += i;
}
TimeUnit.SECONDS.sleep(5);
return sum;
}
}
// 测试
public static void main(String[] args) {
MyCallableTask myCallableTask = new MyCallableTask();
// 一个有未来结果的任务
FutureTask<Integer> futureTask = new FutureTask<>(myCallableTask);
new Thread(futureTask).start();
try {
System.out.println("开始等待...");
Integer sum = futureTask.get();//主线程代码执行到这里会阻塞,直到Callable任务有返回
System.out.println(sum);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
使用线程池
public class MyTreadPool {
public static void main(String[] args) {
// 推荐
ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 6, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
new ThreadPoolExecutor.CallerRunsPolicy());
Runnable runnable1 = new Runnable() {
@Override
public void run() {
System.out.println("I'm ThreadPool code");
}
};
// 往线程池添加任务,并且开始执行
pool.execute(runnable1);
// 不推荐
ExecutorService executorService = Executors.newCachedThreadPool();
Runnable runnable2 = new Runnable() {
@Override
public void run() {
System.out.println("I'm executorService code");
}
};
// 往线程池添加任务,并且开始执行
executorService.execute(runnable2);
}
}
不要直接使用new Thread创建线程
因为你的new Thread
代码被多少并发执行是不确定的,避免线程资源被耗尽拖垮服务。
线程池
推荐使用ThreadPoolExecutor
ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 6, 6, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),// 有界
new ThreadPoolExecutor.CallerRunsPolicy());
说明
参数说明
1、核心线程数;
2、最大线程数;
3、线程保持存活的时间;
4、线程保持存活的时间单位;
5、阻塞队列;
常用:ArrayBlockingQueue
与 LinkedBlockingQueue
对比一下
ArrayBlockingQueue
实现简单,表现稳定,添加和删除使用同一个锁,通常性能不如后者;LinkedBlockingQueue
添加和删除两把锁是分开的,所以竞争会小一些;
6、拒绝策略方案;
细则
1、线程池中线程的创建,是由添加执行任务触发的;
2、如果此刻有空闲的线程来处理当前任务,就不会创建新的线程;
3、核心线程也是按需一个一个创建的,并非一次性创建;
4、当任务超出核心线程负载,则会将该任务放置阻塞 队列中;
5、如果任务超出了阻塞队列,则会创建非核心线程来处理任务;
6、如果任务队列已满,并且任务超出了线程池最大线程负载,则会执行拒绝策略;
7、非核心线程超出保活时间,无任务可以处理,则会被销毁掉;
8、核心线程被创建后默认会一直保活,但是可以设置核心线程也进行超保活时间销毁;
pool.allowCoreThreadTimeOut(true);
Executors创建线程池(谨慎使用),不推荐
public static void main(String[] args) {
ExecutorService e1 = Executors.newSingleThreadExecutor();
e1.execute(new Runnable() {
@Override
public void run() {
System.out.println("处理任务逻辑");
}
});
}
除newScheduledThreadPool
内部实现为ScheduledThreadPoolExecutor
(实则最终还是ThreadPoolExecutor+延迟队列)外,其余三种内部实现都是ThreadPoolExecutor
-
newCachedThreadPool
它使用的是默认拒绝策略线程池总线程大小为
Integer.MAX_VALUE
(最大int值),哈哈哈...任务一多,线程资源会被耗尽导致程序宕机
并且采用
SynchronousQueue
有界队列,它比较特殊SynchronousQueue
是一个内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。与其将这个类称为队列,不如称其为一个数据节点。 特别适合做交换数据用,内部使用队列来实现公平性的调度,使用栈来实现非公平的调度,在Java6时替换了原来的锁逻辑,使用CAS( Compare-and-Swap ,自旋锁)代替了,效率更高了。 -
newFixedThreadPool
看看任务队列容量:Integer.MAX_VALUE
值,哈哈哈...
newScheduledThreadPool
用于处理定时任务的线程池
Java 5 推出了基于线程池设计的 ScheduledExecutor
。其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduedExecutor
才会真正启动一个线程,其余时间 ScheduledExecutor
都是在轮询任务的状态。
-
newSingleThreadExecutor
虽然一个线程,但是任务队列倒挺大
拒绝策略
1、由提交任务的线程处理超出的任务(那么再继续添加新任务会出现阻塞);
2、 丢弃任务并抛出
RejectedExecutionException
异常;(默认拒绝策略)3、直接丢弃,不会出现异常;
4、 丢弃队列最前面的任务,然后重新提交被拒绝的任务;
5、自定义拒绝策略;(比如我记录一下关键日志)
// 任务目标
class MyTask implements Runnable {
String data;
public MyTask(String data) {
this.data = data;
}
@Override
public void run() {
System.out.println("正在处理:" + data);
try {
// 假设每个任务需要处理20毫秒
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return data;
}
}
// 自定义拒绝策略
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//打印丢弃的任务
System.out.println(r.toString() + " is discard");
}
}
// 测试
public class TestMyReject {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(
1, // 核心线程
2, // 总线程数
1, // 保活时间
TimeUnit.SECONDS, // 保活时间单位
new ArrayBlockingQueue<>(2),// 任务队列
new MyRejectedExecutionHandler());//拒绝策略
for (int i = 1; i <= 100; i++) {
final int task = i;
threadPoolExecutor.execute(new MyTask("第" + task + "个任务"));
}
}
}
结果,大量的任务走了拒绝策略
ThreadLocal
是个啥?它能干嘛
我们可以通过ThreadLocal往当前线程中存储数据,然后只要当前线程还没有结束,我们可以在任何该线程执行到的代码位置,把我们存储的数据给拿出来使用。
常见使用,或许你没有注意
- PageHelper设置分页参数,然后在执行SQL的时候,会先过拦截器,从线程中把pageNum、pageSize给拿到,进行分页SQL组装,实现分页查询逻辑;
-
获取用户请求数据RequestContextHolder.getRequestAttributes();或RequestContextHolder.currentRequestAttributes()
如:@Autowired private HttpServletRequest request;或者@Autowired private HttpSession session;其底层实现为Spring对单例对象注入非单例实例的一个解决方案:Scoped Proxy, Spring巧妙地注入了一个装饰器代理了真实对象的操作。再继续往底层挖,实则是最终在代理逻辑中最后调用的是RequestContextHolder.currentRequestAttributes()
RequestContextHolder.getRequestAttributes()一样如此
-
SpringSecurity框架中SecurityContextHolder.getContext()
对ThreadLocal做一下分析
通过ThreadLocal对象调用其set方法
再看
继续
然后我们明白了,通过ThreadLocal对象协调,最终我们将数据存储进入了当前线程对象的属性中,其属性是一个map,key为协调者即ThreadLocal对象,value为我们存储的数据对象。意味着我们可以定义不同的协调对象,往线程的ThreadLocalMap中存储更多的我们想要存储的数据对象。
通过ThreadLocal对象调用其get方法
取值的时候,肯定,也是通过协调者,将其自身做为key从当前线程对象的属性ThreadLocalMap中取与之对应存储的数据对象。