什么是线程
线程 是操作系统能够运行的最小执行单元,被包含在进程之中,而进程,可以广泛的理解为一个 application,一个进程可以包含多个线程,进程和线程之间是包含与被包含的关系,线程的引入主要是为了让应用拥有多任务处理的能力
- 进程是一个容器,里面至少含有一个线程, 比如 Android 中的主线程(main)
- 线程是进程容器中的一个执行单元
- 操作系统按照进程分配资源
- 多个线程可以共享进程中的所以资源,比如内存等
- CPU 最终执行的是线程
- 每个线程都有自己的线程栈和程序执行计数器,保存自己的函数调用过程
Java 中的线程
开启一个线程,Java 中提供了两种方式
继承
Thread
,直接调用start
-
实现
Runable
,然后Thread helloThread = new Thread(new HelloRunnable()); helloThread.start();
两者有什么区别呢,打开Thread
源码,发现:
public class Thread implements Runnable {
........
}
Thread
直接继承Runable
,而Runable
只是一个普通的接口,用来封装一些一个 task,真正开启线程并启动的是 Thread类的 start
方法
public synchronized void start() {//防止多个线程引发异常
//判断是否已经 start 否则抛出异
if (threadStatus != 0 || started)
throw new IllegalThreadStateException();
group.add(this);//加入到当前的线程组中
started = false;//恢复当前线程的 started
try {
//在 C层用 pthred创建一个线程
nativeCreate(this, stackSize, daemon);
started = true;//修改当前线程的 started 状态
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
可以看出,一个线程,真正的启动过程:
group.add(this)
将当前线程加入到线程组里nativeCreate(this, stackSize, daemon);
通过 JNI 使用 Linux 的pthread
创建C线程,这里才是真正的线程创建的地方JNI 回调 Java, 调用Thread 类的
run
方法,最终回调Runable
接口中的实现task,此时,已经切换到新的线程环境
线程间状态:
在 Thread
类中用一个枚举表示,
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
一图胜前言
多线程通信
Java 多线程通信,通常有以下两种形式
一.共享内存机制
因为多个线程共享进程中的内存资源,所以我们可以用共享内存的方式实现多个线程之间的通信,
线程之间共享程序的公共状态,线程之间通过写-读内存中的公共状态来隐式进行通信。而线程间的同步是显式的,程序必须指定某个字段在多个线程间互斥执行
最简单的,我们经常会设置一个共享变量。然后多个线程去操作同一个共享变量。从而达到线程通讯的目的,属于线程之间的间接通信.线程间的同步是隐式
下面是经典的抢票流程:
实现代码:
/**
* 售票窗口,不断的卖票直到票卖完
* @author chengwangyong
* @date 2017/11/12
*/
public class TicketWindow extends Thread {
TicketWindow(String name) {
super(name);
}
@Override
public void run() {
while (TicketConstance.ticketCount > 0) {
System.out.println(Thread.currentThread().getName() + " 售出了第" + TicketConstance.ticketCount + "票");
TicketConstance.ticketCount--;
}
}
}
测试类:
public class ThreadTest {
public static void main(String[] args) {
TicketWindow ticket1 = new TicketWindow("窗口1");
TicketWindow ticket2 = new TicketWindow("窗口2");
TicketWindow ticket3 = new TicketWindow("窗口3");
TicketWindow ticket4 = new TicketWindow("窗口4");
ticket1.start();
ticket2.start();
ticket3.start();
ticket4.start();
}
}
结果
窗口2 售出了第100张票
窗口1 售出了第100张票
窗口4 售出了第100张票
窗口3 售出了第100张票
窗口4 售出了第97张票
窗口1 售出了第98张票
窗口2 售出了第99张票
窗口2 售出了第93张票
窗口2 售出了第92张票
窗口2 售出了第91张票
窗口2 售出了第90张票
窗口1 售出了第94张票
......
发现,第100张票买了四次,明显出现了问题,
原因出在了两个方面:
-
ticketCount
内存可见性,线程1修改了值,但是因为 CPU 还有三级缓存,不一定线程2可见 -
ticketCount--
并不是原子操作,分为三个步骤- 取counter的当前值
- 在当前值基础上加1
- 将新值重新赋值给counter
解决办法:
-
给调用过程加锁,让同一时间只有一个线程去访问被保护的代码,从而让保护的代码变成一个原子操作
public class TicketWindow extends Thread { TicketWindow(String name) { super(name); } @Override public void run() { ticket(); } private void ticket() { synchronized (TicketWindow.class) { while (TicketConstance.ticketCount > 0) { System.out.println(Thread.currentThread().getName() + " 售出了第" + TicketConstance.ticketCount + "张票"); TicketConstance.ticketCount--; } } } }
注意
synchronized
是加到类上面的.synchronized加到方法上和不加任何参数形式,都是加到对象上面的,对同一个对象有效,而加到类上则对同一个类的不同对象都有效,synchronized
大致的执行流程如下- 线程 A 尝试获得锁,如果能获得锁,则执行,不能,加入到线程的等待队列中(wait()),并且阻塞当前的调用(线程状态BLOCKED)
- 执行锁里面的代码
- 释放锁,从等待队列中唤醒一个线程接着往下执行(notify() 锁可重用),如果有多个,随机唤醒一个,并且把修改后的内容写入到内存,保证内存可见性
-
保证原子性操作:
可以给
ticketCount—
加一个volatile
修饰,这样ticketCount--
则变为原子性,每次修改 Java 都能保证每次修改写入到内存中,其他线程可见,并且,volatile
比synchronized
更加的轻量级,如果能用volatile
解决,就不要用synchronized
public static volatile int ticketCount = 100;
二. 消息通信机制
线程之间的直接通信,不同的线程之间通过显式的发送消息来达到交互目的,更加符合面向对象的机制
Java中的每个对象,都有一个锁和线程等待队列,并且在Object
类中,加入了wait
/notify
/notifyAll
方法,让每个对象都拥有控制线程执行状态的能力
wait
: 当前线程放入线程等待队列,释放锁,线程状态变为WAITING
或 者TIMED_WAITING
(取决于是否设置等待时间)
notify
: 从当前对象的等待队列中,随机选择一个唤醒,竞争对象锁
,获取到则进入Running
状态执行,否则加入等待队列,等待下一次唤醒
notifyAll
: 和notify
类似,不同的是唤醒所有等待队列的线程,让所有被唤醒的线程一起竞争对象锁,胜者执行,其他的继续等待下一次唤醒
演示一个基本的线程间互相通信的过程
让主线程执行完毕后通知子线程去执行:
public class ThreadOne extends Thread {
@Override
public void run() {
super.run();
add(100);
}
public synchronized int add(int num) {
try {
wait();
num = num + 100;
System.out.println("num=" + num);
} catch (InterruptedException e) {
e.printStackTrace();
}
return num;
}
public synchronized void notifyThread(){
notify();
}
}
测试代码
public static void main(String[] args) {
ThreadOne threadOne = new ThreadOne();
threadOne.start();
try {
Thread.sleep(1000L);
threadOne.notifyThread();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
可以看出,threadOne
被wait
之后,只有 notify
后才能从wait()
的下一行接着往下执行;
下面基于消息通信机制,实现一个最常见的生产者/消费者模型
生产者/消费者模型
:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从队列上取数据或任务,如果队列满,生产者暂停生成,如果队列空,消费者等待
任务队列代码:
public class TaskQueue<E> {
private Queue<E> taskQueue; //任务队列
private int maxSize; //任务队列最大能装载的任务
public TaskQueue(int maxSize) {
this.maxSize = maxSize;
taskQueue = new ArrayDeque<>(maxSize);
}
// 生产者将数据存入到队列中
public synchronized void put(E e) throws InterruptedException {
if (taskQueue.size() == maxSize) {
wait(); // 如果队列已满 生产者等待状态
}
taskQueue.add(e);
notifyAll();
}
// 消费者从队列中拉取数据
public synchronized E get() throws InterruptedException {
if (taskQueue.isEmpty()) {
wait();// 如果队列已空 消费者者等待状态
}
E e = taskQueue.poll();
notifyAll();
return e;
}
}
生产者代码:
public class Producer extends Thread {
TaskQueue<String> queue;
public Producer(TaskQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int num = 0; // 任务序号
try {
while (true) { // 不停的生产
String task = String.valueOf(num);
queue.put(task);
System.out.println("生产者 生产task " + task);
num++;
Thread.sleep((int) (Math.random() * 100)); // 保证生产者和消费者不同的间隔,乱序
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者代码:
public class Consumer extends Thread {
TaskQueue<String> queue;
public Consumer(TaskQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) { // 不停的从任务队列中获取
String task = queue.get();
System.out.println("消费者 消费 task " + task);
Thread.sleep((int) (Math.random() * 100)); // 保证生产者和消费者不同的间隔,乱序
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试代码:
public static void main(String[] args){
TaskQueue<String> queue = new TaskQueue<>(10);
new Producer(queue).start();
new Consumer(queue).start();
}
输出:
生产者 生产task 0
消费者 消费 task 0
生产者 生产task 1
生产者 生产task 2
消费者 消费 task 1
消费者 消费 task 2
生产者 生产task 3
生产者 生产task 4
消费者 消费 task 3
生产者 生产task 5
消费者 消费 task 4
消费者 消费 task 5
生产者 生产task 6
.......
可以看出,基于消息通信机制的代码,比共享变量控制颗粒更小,更复杂,更适合复杂的多线程环境