多线程编程之两阶段终止模式

       对于多线程编程,如何优雅的终止子线程,始终是一个值得考究的问题。如果直接终止线程,可能会产生三个问题:

  • 子线程当前执行的任务可能必须要原子的执行,即其要么成功执行,要么就不执行;
  • 当前任务队列中还有未执行完的任务,直接终止线程可能导致这些任务被丢弃;
  • 当前线程占用了某些外部资源,比如打开了某个文件,或者使用了某个Socket对象,这些都是无法被垃圾回收的对象,必须由调用方进行清理。

       由此可见,如何优雅的终止一个线程,并不是一个简单的问题。常见的终止线程的方式是,声明一个标志位,如果调用方要终止其创建的线程的执行,就将该标志位设置为需要终止状态,子线程每次执行任务之前会检查该标志位,如果为需要终止状态,就不继续执行任务,而是进行当前线程所占用资源的一些清理工作,如关闭Socket和备份当前未完成的任务,清理完成之后结束当前线程的调用。

       两阶段终止模式使用的就是上述方式进行多线程的终止的,只不过其将线程的终止封装为一个特定的框架,使用者只需要关注特定的任务执行方式即可,从而实现了线程的终止与任务的执行的关注点的分离。两阶段终止模式的UML图如下:

两阶段终止模式类图

       其各角色的作用如下:

  • ThreadOwner:客户端程序,由其创建线程并执行任务,Terminatable提供的终止方法也是由其调用;
  • Terminatable:终止方法提供的一个抽象接口,提供了一个terminate()方法供外部调用;
  • TerminatableSupport:实现了Terminatable接口的抽象类,封装了具体的终止模板,其doRun()是一个抽象方法,子类必须实现,用于编写相关的任务的代码,doTermiate()和doCleanup()方法都是钩子方法,提供了空的实现,子类根据具体情况判断是否需要实现该方法;
  • ConcreteTerminatable:用户具体的终止类,其doRun()方法用于实现具体的任务;
  • TerminationToken:包含了一个标志位,并且记录了当前线程还需要执行的任务数量,默认情况下,只有其标志位为true,并且剩余需要执行的任务数为0时才会真正的终止当前线程的执行。

       如下是两阶段终止模式各个类的实现,我们首先看看Terminatable接口及其抽象实现TerminatableSupport:

public interface Terminatable {
  void terminate();
}
public abstract class TerminatableSupport extends Thread implements Terminatable {
  public final TerminationToken terminationToken;  // 记录当前的标志位

  public TerminatableSupport() {
    this(new TerminationToken());  // 初始化当前标志位
  }

  public TerminatableSupport(TerminationToken terminationToken) {
    super();
    this.terminationToken = terminationToken;  // 初始化标志位
    terminationToken.register(this);  // 注册当前对象的标志位
  }

  protected abstract void doRun() throws Exception;  // 供子类实现具体任务的方法

  // 钩子方法,用于子类进行一些清理工作
  protected void doCleanup(Exception cause) {}

  // 钩子方法,用于子类进行终止时的一些定制化操作
  protected void doTerminate() {}

  @Override
  public void run() {
    Exception ex = null;
    try {
      // 在当前线程中执行任务时,会判断是否标识为终止,并且剩余任务数小于等于0,是才会真正终止当前线程
      while (!terminationToken.isToShutdown() || terminationToken.reservations.get() > 0) {
        doRun();
      }
    } catch (Exception e) {
      ex = e;
    } finally {
      try {
        doCleanup(ex);  // 当前线程终止后需要执行的操作
      } finally {
        terminationToken.notifyThreadTermination(this);
      }
    }
  }

  @Override
  public void interrupt() {
    terminate();
  }

  @Override
  public void terminate() {
    terminationToken.setToShutdown(true);  // 设置终止状态
    try {
      doTerminate();  // 执行客户端定制的终止操作
    } finally {
      if (terminationToken.reservations.get() <= 0) {
        super.interrupt();  // 如果当前线程处于终止状态,则强制终止当前线程
      }
    }
  }

  // 提供给客户端调用的,即客户端线程必须等待终止完成之后才会继续往下执行
  public void terminate(boolean waitUntilThreadTerminated) {
    terminate();
    if (waitUntilThreadTerminated) {
      try {
        this.join();
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

       当客户端调用termiante()方法时,其首先会将当前的终止状态设置为true,然后调用doTerminate()方法,这里需要注意的一点是,如果当前线程在doRun()方法中处于等待状态,比如Thread.sleep()、Thread.wait()方法等,那么即使设置了终止状态,也无法使其被唤醒,因为其无法运行到检测终止状态的代码处,其只能使用intertupt()方法才能使其被唤醒并终止,但是对于Socket.read()方法,即使调用了interrupt()方法,也无法使其终止,因而这里设置了doTerminate()方法,用于子类在该方法中关闭Socket。最后在finally块中,调用super.interrupt()方法,该调用的作用也即如果当前线程在doRun()方法中被阻塞,就强制终止其执行。

public class TerminationToken {
  protected volatile boolean toShutdown = false;  // 终止状态的标志位
  public final AtomicInteger reservations = new AtomicInteger(0);  // 记录当前剩余任务数

  // 记录了所有注册了TerminationToken的实例,这里使用Queue是因为可能会有多个
  // Terminatable实例共享同一个TeraminationToken,如果是共享的,那么reservations
  // 实例就保存了所有共享当前TerminationToken实例的线程所需要执行的任务总数
  private final Queue<WeakReference<Terminatable>> coordinatedThreads;

  public TerminationToken() {
    coordinatedThreads = new ConcurrentLinkedQueue<>();
  }

  public boolean isToShutdown() {
    return toShutdown;
  }

  public void setToShutdown(boolean toShutdown) {
    this.toShutdown = toShutdown;
  }

  // 将当前Terminatable实例注册到当前TerminationToken中
  protected void register(Terminatable thread) {
    coordinatedThreads.add(new WeakReference<>(thread));
  }

  // 如果是多个Terminatable实例注册到当前TerminationToken中,
  // 则广播当前的终止状态,使得这些实例都会终止
  protected void notifyThreadTermination(Terminatable thread) {
    WeakReference<Terminatable> wrThread;
    Terminatable otherThread;
    while (null != (wrThread = coordinatedThreads.poll())) {
      otherThread = wrThread.get();
      if (null != otherThread && otherThread != thread) {
        otherThread.terminate();
      }
    }
  }
}

       关于Terminatable和TerminationToken的关系是一对多的关系,即多个Terminatable实例可共用一个TerminationToken实例,而其reservations属性所保存的则是这多个Terminatable实例所共同要完成的任务数量。这里典型的多个Terminatable共用一个TerminationToken实例的例子是当有多个工作者线程时,这几个线程所消费的任务是共用的,因而其TermiantionToken实例也需要共用。

       两阶段终止模式的使用场景非常的多,基本上只要是使用了子线程的位置都需要使用一定的方式来优雅的终止该线程的执行。我们这里使用生产者和消费者的例子来演示两阶段终止模式的使用,如下是该例子的代码:

public class SomeService {
  private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

  private final Producer producer = new Producer();
  private final Consumer consumer = new Consumer();

  public static void main(String[] args) throws InterruptedException {
    SomeService ss = new SomeService();
    ss.init();
    TimeUnit.SECONDS.sleep(500);
    ss.shutdown();
  }

  // 停止生产者和消费者的执行
  public void shutdown() {
    producer.terminate(true);  // 先停止生产者,只有在生产者完全停止之后才会停止消费者
    consumer.terminate();  // 停止消费者
  }

  // 启动生产者和消费者
  public void init() {
    producer.start();
    consumer.start();
  }

  // 生产者
  private class Producer extends TerminatableSupport {
    private int i = 0;

    @Override
    protected void doRun() throws Exception {
      queue.put(String.valueOf(i++));  // 将任务添加到任务队列中
      consumer.terminationToken.reservations.incrementAndGet();  // 更新需要执行的任务数量
    }
  }

  // 消费者
  private class Consumer extends TerminatableSupport {
    @Override
    protected void doRun() throws Exception {
      String product = queue.take();  // 获取任务
      System.out.println("Processing product: " + product);
      try {
        TimeUnit.SECONDS.sleep(new Random().nextInt(100));  // 模拟消费者对任务的执行
      } catch (InterruptedException e) {
        // ignore
      } finally {
        terminationToken.reservations.decrementAndGet();  // 更新需要执行的任务数量
      }
    }
  }
}

       可以看到,在子类使用两阶段终止模式时,其只需要实现各自所需要执行的任务,并且更新当前任务的数量即可。在某些情况下,当前任务的数量也可以不进行更新,比如在进行终止时,不关心当前剩余多少任务需要执行。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容