软件都是由不同的模块组成一个系统,从而模块与模块间的通信,线程与线程间通信是经常碰到的。下面我们介绍了关于线程间通信的几种技术方案
1.wait/notify
1.1不使用wait/notify的代码演示
public class TpadTask {
private List<Integer> list = new ArrayList<>();
public void doAdd() {
Integer num = new Random().nextInt(100);
list.add(num);
}
public int size() {
return list.size();
}
}
public class ThreadA extends Thread {
private TpadTask task;
public ThreadA(TpadTask task) {
super();
this.task = task;
}
@Override
public void run() {
for(int i=0;i<10;i++){
task.doAdd();
System.out.println(i);
}
}
}
public class ThreadB extends Thread {
private TpadTask task;
public ThreadB(TpadTask task) {
super();
this.task = task;
}
@Override
public void run() {
try {
while (true) {
if (task.size() == 5) {
System.out.println("已达到5,b线程要退出");
throw new InterruptedException();
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
public class Client {
public static void main(String[] args) {
TpadTask task = new TpadTask();
ThreadA a = new ThreadA(task);
ThreadB b = new ThreadB(task);
a.start();
b.start();
}
}
说明:线程A是进行对list进行添加数据,线程B是对list的大小进行监控,如果达到5,则主动退出,抛出异常。
- 结果
0
1
2
3
4
已达到5,b线程要退出
5
6
7
8
9
java.lang.InterruptedException
at communication.ThreadB.run(ThreadB.java:21)
缺点:
- 线程B需要不停的循环来判断定条件,这样会浪费CPU资源。
1.2 wait/notify机制
- 1.wait()
wait()的作用是使当前执行的线程进行等待,wait()方法是Object类的方法。该方法是将当前线程置入预执行队列,并在wait代码处停止执行,直到接收到通知或被中断执行。
在调用wait()之前,线程必须获取该对象的对象级别锁,即只能在同步方法或者同步代码块中调用wait()方法。
在执行wait()方法后,当前线程释放锁。从wait()返回前,线程与其他线程竞争重新获取锁。如果调用wait()没有持有锁,则会抛出异常。
- wait方法的注释
The current thread must own this object's monitor.
- 2.notify()
notify()的作用是使停止的线程继续运行。
在调用notify()之前,线程必须获取该对象的对象级别锁,也只能在同步方法或者同步代码块调用
执行notify()方法后,当前线程不会马上释放该对象锁。呈现wait的线程也不会马上获取到该对象锁。要等到执行notify()方法的线程将程序执行完。也就是退出synchronized代码块后,当前线程才会释放该对象锁。
3.改造上述代码
TpadTask代码不变
ThreadA代码改造
public class ThreadA extends Thread {
private TpadTask task;
private Object lock;
public ThreadA(TpadTask task, Object obj) {
super();
this.task = task;
this.lock = obj;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
synchronized (lock) {
task.doAdd();
System.out.println(i);
if (task.size() == 5) {
lock.wait();
System.out.println("接收到信息!!!");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- ThreadB
public class ThreadB extends Thread {
private TpadTask task;
private Object lock;
public ThreadB(TpadTask task, Object obj) {
super();
this.task = task;
this.lock = obj;
}
@Override
public void run() {
try {
synchronized (lock) {
if (task.size() == 5) {
lock.notify();
System.out.println("已发出信号!!!");
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
- 测试
public class Client {
public static void main(String[] args) throws InterruptedException {
TpadTask task = new TpadTask();
Object lock = new Object();
ThreadA a = new ThreadA(task, lock);
a.start();
ThreadB b = new ThreadB(task, lock);
b.start();
}
}
- 结果
0
1
2
3
4
已发出信号!!!
接收到信息!!!
5
6
7
8
9
- 4.线程状态切换
- 5.wait(long timeout)
等待一段时间,看是否有线程对锁进行唤醒。如果超过这个时间,则自动唤醒。
2.通过管道流进行通信(pipeStream)
jdk提供了4个类用于管道流通信。java.io.PipedInputStream、java.io.PipedOutputStream、java.io.PipedReader、java.io.PipedWriter
它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。
大致的流程是:我们在线程A中向PipedOutputStream中写入数据,这些数据会自动的发送到与PipedOutputStream对应的PipedInputStream中,进而存储在PipedInputStream的缓冲中;此时,线程B通过读取PipedInputStream中的数据。就可以实现,线程A和线程B的通信。
PipedOutputStream、PipedInputStream代码演示
新建生产消息线程
public class Producer extends Thread {
private PipedOutputStream pos;
public Producer(PipedOutputStream pos) {
this.pos = pos;
}
@Override
public void run() {
super.run();
try {
pos.write("Hello".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 新建接收消息线程
public class Consumer extends Thread {
private PipedInputStream pis;
public Consumer(PipedInputStream pis) {
this.pis = pis;
}
@Override
public void run() {
super.run();
byte[] b = new byte[100]; // 将数据保存在byte数组中
try {
int len = pis.read(b); // 从数组中得到实际大小。
System.out.println(String.format("接收到信号:%s",new String(b, 0, len)));
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 测试
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream();
try {
pos.connect(pis);// 连接管道
new Producer(pos).start();// 启动线程
new Consumer(pis).start();// 启动线程
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 结果
接收到信号:Hello
pos.connect(pis)作用是将“管道输入流”和“管道输出流”关联起来。查看PipedWriter.java和PipedReader.java中connect()的源码;我们知道 out.connect(in); 等价于 in.connect(out)。当然JDK还支持管道通信字符流。
3.Join
主线程创建并启动子线程,如果子线程需要消耗大量的资源,主线程往往早于子线程执行完毕。而主线程需要等待子线程的结果,这时候使用join方法。
- 例子
public class CustomerThread extends Thread {
public CustomerThread(){
super();
}
@Override
public void run() {
System.out.println("我是子线程代码");
}
}
public class Client {
public static void main(String[] args) throws InterruptedException {
CustomerThread a=new CustomerThread();
a.start();
System.out.println("我是主线程代码");
}
}
- 输出
我是主线程代码
我是子线程代码
主线程已经执行完毕了,才执行子线程,如果需要用到子线程的结果。那么只需要加一句join()。
public class Client {
public static void main(String[] args) throws InterruptedException {
CustomerThread a=new CustomerThread();
a.start();
a.join();//join语句
System.out.println("我是主线程代码");
}
}
- 结果
我是子线程代码
我是主线程代码
join的方法注释是:
/**
* Waits at most {@code millis} milliseconds for this thread to
* die. A timeout of {@code 0} means to wait forever.
*/
while (isAlive()) {
wait(0);
}
使得所属的线程对象正常的执行,使得当前的线程进行无线等待,直到所属线程销毁后,再执行当前线程。
在join过程中,如果当前线程对象被中断,则当前线程对象抛出异常。
join(long)中的参数是等待的时间。例如join(2000),只等等2秒子线程的执行时间。超过2秒自动执行主线程代码。
3.1join(long)与sleep(long)的区别
join内部使用的是wait(long)来实现的。所以join()具有释放锁的特性。当前线程的锁被释放,那么其他线程就可以调用此线程的同步方法了。
Thread.sleep(long)方法却不释放锁。
4.ThreadLocal
ThreadLocal并不是一个Thread,而是Thread的局部变量,也许把它命名为ThreadLocalVariable更容易让人理解一些。
ThreadLocal是解决每个线程绑定属于自己的值。
ThreadLocal是解决线程安全问题一个很好的思路,它通过为每个线程提供一个独立的变量副本。解决了变量并发访问的冲突问题。在很多情况下,ThreadLocal比直接使用synchronized同步机制解决线程安全问题更简单,更方便,且结果程序拥有更高的并发性。
代码演示
新建ThreadLocal公用类
public class ThreadLocalVariable {
public static ThreadLocal threadLocalVariable = new ThreadLocal();
}
- 新建ThreadA
public class ThreadA extends Thread {
@Override
public void run() {
try {
for(int i=0;i<5;i++){
ThreadLocalVariable.threadLocalVariable.set(String.format("ThreadA:%s",i));
System.out.println(ThreadLocalVariable.threadLocalVariable.get());
}
}
catch (Exception ex){
ex.printStackTrace();
}
}
}
- 新建ThreadB
public class ThreadB extends Thread {
@Override
public void run() {
try {
for(int i=0;i<5;i++){
ThreadLocalVariable.threadLocalVariable.set(String.format("ThreadB:%s",i));
System.out.println(ThreadLocalVariable.threadLocalVariable.get());
}
}
catch (Exception ex){
ex.printStackTrace();
}
}
}
- Client
public class Client {
public static void main(String[] args) {
ThreadA a = new ThreadA();
ThreadB b = new ThreadB();
a.start();
b.start();
ThreadLocalVariable.threadLocalVariable.set("a");
System.out.println(ThreadLocalVariable.threadLocalVariable.get());
}
}
- 结果
a
ThreadA:0
ThreadA:1
ThreadA:2
ThreadA:3
ThreadA:4
ThreadB:0
ThreadB:1
ThreadB:2
ThreadB:3
ThreadB:4
每个线程都对threadLocalVariable进行设置值,但是取值的时候,都是取的各自的设置的值。
没赋值前返回的值是null
protected T initialValue() {
return null;
}
4.1InheritableThreadLocal
该类可以让子线程从父线程中取出值。而子线程从父类继承的值可以在子线程进行对值得修改