概述
线程间协作即有多个线程需要按照一定顺序相互协作进行。主要有两种方法来实现,使用锁(互斥)来同步两个任务的行为。另一种是使用BlockingQueue,它已经帮我们处理好了同步机制,实现更加简单。
举例
接下来以一个实际场景为例,进行演示。假设在一个餐馆中有一个服务员,有一个厨师,而服务员要等到厨子把菜做好了才能上菜,然后回来继续等待。而厨师得到新订单后开始做菜。用两种方式实现之前,我们分析知厨师和服务员分别是一个独立的线程,他们通过餐厅联结在一起。在这个模型中厨师代表生产者,服务员代表消费者。Order是他们共享的资源,需要进行同步。
使用锁的互斥
- 菜
public class Order {
private int num=0;
public Order(int num) {
this.num=num;
}
@Override
public String toString() {
return "order num:"+num;
}
}
- 餐馆
public class Restaurant {
Order order;
Chef chef=new Chef(this);
Waiter waiter=new Waiter(this);
ExecutorService executorService= Executors.newCachedThreadPool();
public Restaurant() {
order =null;
executorService.execute(chef);
executorService.execute(waiter);
try {
TimeUnit.SECONDS.sleep(5);
}catch (Exception e){
e.printStackTrace();
}
executorService.shutdown();
}
public static void main(String[] args){
new Restaurant();
}
}
- 厨师
public class Chef implements Runnable {
private Restaurant restaurant;
private int counter=0;
public Chef(Restaurant restaurant) {
this.restaurant = restaurant;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
synchronized (this){
while (restaurant.meal!=null){
wait();//等服务员上菜,获得新订单
}
}
synchronized (restaurant.waiter){
//获得服务员的锁,让他等我做菜
restaurant.meal=new Meal(counter++);
System.out.print("a meal is done");
Thread.sleep(500);
restaurant.waiter.notifyAll();
//告诉服务员可以上菜了
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
- 服务员
public class Waiter implements Runnable {
private Restaurant restaurant;
public Waiter(Restaurant restaurant) {
this.restaurant = restaurant;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
synchronized (this){
while (restaurant.order ==null){
wait();//等待厨师做完菜后被chef的notifyAll()唤醒,注意wait()会释放当前获得的锁
}
}
synchronized (restaurant.chef){
System.out.print("waiter: order up\n");
restaurant.order =null;
restaurant.chef.notifyAll();//告诉厨师可以做菜了
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
由此可见chef与waiter按照顺序协调了
使用BlockingQueue同步
- 创建自己的BlockingQueue
public class MealQueue extends LinkedBlockingQueue<Order> {
}
- 餐馆
public class Restaurant {
private MealQueue waitQueue;
private MealQueue finishedQueue;
private Chef chef;
private Waiter waiter;
public Restaurant() {
waitQueue = new MealQueue();
finishedQueue = new MealQueue();
chef = new Chef(waitQueue, finishedQueue);
waiter = new Waiter(waitQueue, finishedQueue);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(chef);
executorService.execute(waiter);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
public static void main(String[] args){
new Restaurant();
}
}
- 厨师
public class Chef implements Runnable{
private MealQueue waitQueue;
private MealQueue finishedQueue;
public Chef(MealQueue waitQueue, MealQueue finishedQueue) {
this.waitQueue = waitQueue;
this.finishedQueue = finishedQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
Order order =waitQueue.take();
Thread.sleep(500);
System.out.print("chef:order done "+ order.toString()+"\n");
finishedQueue.add(order);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 服务员
public class Waiter implements Runnable{
private MealQueue waitQueue;
private MealQueue finishedQueue;
private int count;
public Waiter(MealQueue waitQueue, MealQueue finishedQueue) {
this.waitQueue = waitQueue;
this.finishedQueue = finishedQueue;
count=0;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
Order newOrder=new Order(count++);
waitQueue.add(newOrder);
System.out.print("waiter:a new order\n");
Order order =finishedQueue.take();
System.out.print("waiter:order complete "+ order.toString()+"\n");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
在这个版本中,我们没有在任何一个地方显示加锁,但它仍能有序进行非常简单
总结
我们通过两种方法完成了线程的协作,个人觉得使用BlockingQueuer更容易也更好管理。最后还有一个例子模拟生产吐司面包,第一步制作吐司,第二步抹黄油,第三步涂果酱。代码已同步到github,不再赘述。如发现错误,欢迎指正。