Google-Guava Concurrent 包里的 Service 框架浅析

概述

Guava 包里的 Service 接口用于封装一个服务对象的运行状态、包括 start 和 stop 等方法。例如 web 服务器,RPC 服务器、计时器等可以实现这个接口。对此类服务的状态管理并不轻松、需要对服务的开启/关闭进行妥善管理、特别是在多线程环境下尤为复杂。Guava 包提供了一些基础类帮助你管理复杂的状态转换逻辑和同步细节。

使用一个服务

一个服务正常生命周期有:

服务一旦被停止就无法再重新启动了。如果服务在 starting、running、stopping 状态出现问题、会进入 Service.State.FAILED.状态。调用 startAsync()方法可以异步开启一个服务,同时返回 this 对象形成方法调用链。注意:只有在当前服务的状态是 NEW 时才能调用 startAsync()方法,因此最好在应用中有一个统一的地方初始化相关服务。停止一个服务也是类似的、使用异步方法 stopAsync() 。但是不像 startAsync(),多次调用这个方法是安全的。这是为了方便处理关闭服务时候的锁竞争问题。

Service 也提供了一些方法用于等待服务状态转换的完成:

通过 addListener()方法异步添加监听器。此方法允许你添加一个 Service.Listener 、它会在每次服务状态转换的时候被调用。注意:最好在服务启动之前添加 Listener(这时的状态是 NEW)、否则之前已发生的状态转换事件是无法在新添加的 Listener上被重新触发的。

同步使用 awaitRunning()。这个方法不能被打断、不强制捕获异常、一旦服务启动就会返回。如果服务没有成功启动,会抛出 IllegalStateException 异常。同样的, awaitTerminated() 方法会等待服务达到终止状态(TERMINATED 或者 FAILED)。两个方法都有重载方法允许传入超时时间。

Service 接口本身实现起来会比较复杂、且容易碰到一些捉摸不透的问题。因此我们不推荐直接实现这个接口。而是请继承 Guava 包里已经封装好的基础抽象类。每个基础类支持一种特定的线程模型。

基础实现类

AbstractIdleService

AbstractIdleService 类简单实现了 Service 接口、其在 running 状态时不会执行任何动作–因此在 running 时也不需要启动线程–但需要处理开启/关闭动作。要实现一个此类的服务,只需继承 AbstractIdleService 类,然后自己实现 startUp()shutDown()方法就可以了。

    protected void startUp() {
    servlets.add(new GcStatsServlet());
    }
    protected void shutDown() {}

如上面的例子、由于任何请求到 GcStatsServlet 时已经会有现成线程处理了,所以在服务运行时就不需要做什么额外动作了。

AbstractExecutionThreadService

AbstractExecutionThreadService 通过单线程处理启动、运行、和关闭等操作。你必须重载 run()方法,同时需要能响应停止服务的请求。具体的实现可以在一个循环内做处理:

    public void run() {
      while (isRunning()) {
        // perform a unit of work
      }
    }

另外,你还可以重载 triggerShutdown()方法让 run()方法结束返回。

重载 startUp()和 shutDown()方法是可选的,不影响服务本身状态的管理

    protected void startUp() {
    dispatcher.listenForConnections(port, queue);
     }
     protected void run() {
       Connection connection;
       while ((connection = queue.take() != POISON)) {
     process(connection);
       }
     }
     protected void triggerShutdown() {
       dispatcher.stopListeningForConnections(queue);
       queue.put(POISON);
     }

start()内部会调用 startUp()方法,创建一个线程、然后在线程内调用 run()方法。stop()会调用 triggerShutdown()方法并且等待线程终止。

AbstractScheduledService

AbstractScheduledService 类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration()方法定义一个周期执行的任务,以及相应的 startUp()和 shutDown()方法。为了能够描述执行周期,你需要实现 scheduler()方法。通常情况下,你可以使用 AbstractScheduledService.Scheduler 类提供的两种调度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit),类似于 JDK 并发包中 ScheduledExecutorService 类提供的两种调度方式。如要自定义 schedules 则可以使用 CustomScheduler 类来辅助实现;具体用法见 javadoc。

AbstractService

如需要自定义的线程管理、可以通过扩展 AbstractService 类来实现。一般情况下、使用上面的几个实现类就已经满足需求了,但如果在服务执行过程中有一些特定的线程处理需求、则建议继承 AbstractService 类。

继承 AbstractService 方法必须实现两个方法.

  • doStart(): 首次调用 startAsync()时会同时调用 doStart(),doStart()内部需要处理所有的初始化工作、如果启动成功则调用 notifyStarted()方法;启动失败则调用 notifyFailed()

  • doStop(): 首次调用 stopAsync()会同时调用 doStop(),doStop()要做的事情就是停止服务,如果停止成功则调用 notifyStopped()方法;停止失败则调用 notifyFailed()方法。

doStart 和 doStop 方法的实现需要考虑下性能,尽可能的低延迟。如果初始化的开销较大,如读文件,打开网络连接,或者其他任何可能引起阻塞的操作,建议移到另外一个单独的线程去处理。

使用 ServiceManager

除了对 Service 接口提供基础的实现类,Guava 还提供了 ServiceManager 类使得涉及到多个 Service 集合的操作更加容易。通过实例化 ServiceManager 类来创建一个 Service 集合,你可以通过以下方法来管理它们:

  • startAsync() : 将启动所有被管理的服务。如果当前服务的状态都是 NEW 的话、那么你只能调用该方法一次、这跟 Service#startAsync()是一样的。
  • stopAsync() :将停止所有被管理的服务。
  • addListener() : 会添加一个 ServiceManager.Listener,在服务状态转换中会调用该 Listener
  • awaitHealthy() :会等待所有的服务达到 Running 状态
  • awaitStopped():会等待所有服务达到终止状态

检测类的方法有:

  • isHealthy() :如果所有的服务处于 Running 状态、会返回 True
  • servicesByState():以状态为索引返回当前所有服务的快照
  • startupTimes() :返回一个 Map 对象,记录被管理的服务启动的耗时、以毫秒为单位,同时 Map 默认按启动时间排序。

我们建议整个服务的生命周期都能通过 ServiceManager 来管理,不过即使状态转换是通过其他机制触发的、也不影响 ServiceManager 方法的正确执行。例如:当一个服务不是通过 startAsync()、而是其他机制启动时,listeners 仍然可以被正常调用、awaitHealthy()也能够正常工作。ServiceManager 唯一强制的要求是当其被创建时所有的服务必须处于 New 状态。

附:TestCase、也可以作为练习 Demo

ServiceTest

    /*
     * Copyright (C) 2013 The Guava Authors
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */

    package com.google.common.util.concurrent;

    import static com.google.common.util.concurrent.Service.State.FAILED;
    import static com.google.common.util.concurrent.Service.State.NEW;
    import static com.google.common.util.concurrent.Service.State.RUNNING;
    import static com.google.common.util.concurrent.Service.State.STARTING;
    import static com.google.common.util.concurrent.Service.State.STOPPING;
    import static com.google.common.util.concurrent.Service.State.TERMINATED;

    import junit.framework.TestCase;

    /**
     * Unit tests for {@link Service}
     */
    public class ServiceTest extends TestCase {

    /** Assert on the comparison ordering of the State enum since we guarantee it. */
     public void testStateOrdering() {
     // List every valid (direct) state transition.
     assertLessThan(NEW, STARTING);
     assertLessThan(NEW, TERMINATED);

     assertLessThan(STARTING, RUNNING);
     assertLessThan(STARTING, STOPPING);
     assertLessThan(STARTING, FAILED);

     assertLessThan(RUNNING, STOPPING);
     assertLessThan(RUNNING, FAILED);

     assertLessThan(STOPPING, FAILED);
     assertLessThan(STOPPING, TERMINATED);
     }

     private static <T extends Comparable<? super T>> void assertLessThan(T a, T b) {
     if (a.compareTo(b) >= 0) {
     fail(String.format("Expected %s to be less than %s", a, b));
     }
     }
    }

AbstractIdleServiceTest

    /*
     * Copyright (C) 2009 The Guava Authors
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */

    package com.google.common.util.concurrent;

    import static org.truth0.Truth.ASSERT;

    import com.google.common.collect.Lists;

    import junit.framework.TestCase;

    import java.util.List;
    import java.util.concurrent.Executor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;

    /**
     * Tests for {@link AbstractIdleService}.
     *
     * @author Chris Nokleberg
     * @author Ben Yu
     */
    public class AbstractIdleServiceTest extends TestCase {

    // Functional tests using real thread. We only verify publicly visible state.
     // Interaction assertions are done by the single-threaded unit tests.

    public static class FunctionalTest extends TestCase {

    private static class DefaultService extends AbstractIdleService {
     @Override protected void startUp() throws Exception {}
     @Override protected void shutDown() throws Exception {}
     }

    public void testServiceStartStop() throws Exception {
     AbstractIdleService service = new DefaultService();
     service.startAsync().awaitRunning();
     assertEquals(Service.State.RUNNING, service.state());
     service.stopAsync().awaitTerminated();
     assertEquals(Service.State.TERMINATED, service.state());
     }

    public void testStart_failed() throws Exception {
     final Exception exception = new Exception("deliberate");
     AbstractIdleService service = new DefaultService() {
     @Override protected void startUp() throws Exception {
     throw exception;
     }
     };
     try {
     service.startAsync().awaitRunning();
     fail();
     } catch (RuntimeException e) {
     assertSame(exception, e.getCause());
     }
     assertEquals(Service.State.FAILED, service.state());
     }

    public void testStop_failed() throws Exception {
     final Exception exception = new Exception("deliberate");
     AbstractIdleService service = new DefaultService() {
     @Override protected void shutDown() throws Exception {
     throw exception;
     }
     };
     service.startAsync().awaitRunning();
     try {
     service.stopAsync().awaitTerminated();
     fail();
     } catch (RuntimeException e) {
     assertSame(exception, e.getCause());
     }
     assertEquals(Service.State.FAILED, service.state());
     }
     }

    public void testStart() {
     TestService service = new TestService();
     assertEquals(0, service.startUpCalled);
     service.startAsync().awaitRunning();
     assertEquals(1, service.startUpCalled);
     assertEquals(Service.State.RUNNING, service.state());
     ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
     }

    public void testStart_failed() {
     final Exception exception = new Exception("deliberate");
     TestService service = new TestService() {
     @Override protected void startUp() throws Exception {
     super.startUp();
     throw exception;
     }
     };
     assertEquals(0, service.startUpCalled);
     try {
     service.startAsync().awaitRunning();
     fail();
     } catch (RuntimeException e) {
     assertSame(exception, e.getCause());
     }
     assertEquals(1, service.startUpCalled);
     assertEquals(Service.State.FAILED, service.state());
     ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
     }

    public void testStop_withoutStart() {
     TestService service = new TestService();
     service.stopAsync().awaitTerminated();
     assertEquals(0, service.startUpCalled);
     assertEquals(0, service.shutDownCalled);
     assertEquals(Service.State.TERMINATED, service.state());
     ASSERT.that(service.transitionStates).isEmpty();
     }

    public void testStop_afterStart() {
     TestService service = new TestService();
     service.startAsync().awaitRunning();
     assertEquals(1, service.startUpCalled);
     assertEquals(0, service.shutDownCalled);
     service.stopAsync().awaitTerminated();
     assertEquals(1, service.startUpCalled);
     assertEquals(1, service.shutDownCalled);
     assertEquals(Service.State.TERMINATED, service.state());
     ASSERT.that(service.transitionStates)
     .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
     }

    public void testStop_failed() {
     final Exception exception = new Exception("deliberate");
     TestService service = new TestService() {
     @Override protected void shutDown() throws Exception {
     super.shutDown();
     throw exception;
     }
     };
     service.startAsync().awaitRunning();
     assertEquals(1, service.startUpCalled);
     assertEquals(0, service.shutDownCalled);
     try {
     service.stopAsync().awaitTerminated();
     fail();
     } catch (RuntimeException e) {
     assertSame(exception, e.getCause());
     }
     assertEquals(1, service.startUpCalled);
     assertEquals(1, service.shutDownCalled);
     assertEquals(Service.State.FAILED, service.state());
     ASSERT.that(service.transitionStates)
     .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
     }

    public void testServiceToString() {
     AbstractIdleService service = new TestService();
     assertEquals("TestService [NEW]", service.toString());
     service.startAsync().awaitRunning();
     assertEquals("TestService [RUNNING]", service.toString());
     service.stopAsync().awaitTerminated();
     assertEquals("TestService [TERMINATED]", service.toString());
     }

    public void testTimeout() throws Exception {
     // Create a service whose executor will never run its commands
     Service service = new TestService() {
     @Override protected Executor executor() {
     return new Executor() {
     @Override public void execute(Runnable command) {}
     };
     }
     };
     try {
     service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
     fail("Expected timeout");
     } catch (TimeoutException e) {
     ASSERT.that(e.getMessage()).contains(Service.State.STARTING.toString());
     }
     }

    private static class TestService extends AbstractIdleService {
     int startUpCalled = 0;
     int shutDownCalled = 0;
     final List<State> transitionStates = Lists.newArrayList();

    @Override protected void startUp() throws Exception {
     assertEquals(0, startUpCalled);
     assertEquals(0, shutDownCalled);
     startUpCalled++;
     assertEquals(State.STARTING, state());
     }

    @Override protected void shutDown() throws Exception {
     assertEquals(1, startUpCalled);
     assertEquals(0, shutDownCalled);
     shutDownCalled++;
     assertEquals(State.STOPPING, state());
     }

    @Override protected Executor executor() {
     transitionStates.add(state());
     return MoreExecutors.sameThreadExecutor();
     }
     }
    }

AbstractScheduledServiceTest

    /*
     * Copyright (C) 2011 The Guava Authors
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */

    package com.google.common.util.concurrent;

    import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
    import com.google.common.util.concurrent.Service.State;

    import junit.framework.TestCase;

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicInteger;

    /**
     * Unit test for {@link AbstractScheduledService}.
     *
     * @author Luke Sandberg
     */

    public class AbstractScheduledServiceTest extends TestCase {

    volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
     volatile ScheduledFuture<?> future = null;

    volatile boolean atFixedRateCalled = false;
     volatile boolean withFixedDelayCalled = false;
     volatile boolean scheduleCalled = false;

    final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
     @Override
     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
     long delay, TimeUnit unit) {
     return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
     }
     };

    public void testServiceStartStop() throws Exception {
     NullService service = new NullService();
     service.startAsync().awaitRunning();
     assertFalse(future.isDone());
     service.stopAsync().awaitTerminated();
     assertTrue(future.isCancelled());
     }

    private class NullService extends AbstractScheduledService {
     @Override protected void runOneIteration() throws Exception {}
     @Override protected Scheduler scheduler() { return configuration; }
     @Override protected ScheduledExecutorService executor() { return executor; }
     }

    public void testFailOnExceptionFromRun() throws Exception {
     TestService service = new TestService();
     service.runException = new Exception();
     service.startAsync().awaitRunning();
     service.runFirstBarrier.await();
     service.runSecondBarrier.await();
     try {
     future.get();
     fail();
     } catch (ExecutionException e) {
     // An execution exception holds a runtime exception (from throwables.propogate) that holds our
     // original exception.
     assertEquals(service.runException, e.getCause().getCause());
     }
     assertEquals(service.state(), Service.State.FAILED);
     }

    public void testFailOnExceptionFromStartUp() {
     TestService service = new TestService();
     service.startUpException = new Exception();
     try {
     service.startAsync().awaitRunning();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(service.startUpException, e.getCause());
     }
     assertEquals(0, service.numberOfTimesRunCalled.get());
     assertEquals(Service.State.FAILED, service.state());
     }

    public void testFailOnExceptionFromShutDown() throws Exception {
     TestService service = new TestService();
     service.shutDownException = new Exception();
     service.startAsync().awaitRunning();
     service.runFirstBarrier.await();
     service.stopAsync();
     service.runSecondBarrier.await();
     try {
     service.awaitTerminated();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(service.shutDownException, e.getCause());
     }
     assertEquals(Service.State.FAILED, service.state());
     }

    public void testRunOneIterationCalledMultipleTimes() throws Exception {
     TestService service = new TestService();
     service.startAsync().awaitRunning();
     for (int i = 1; i < 10; i++) {
     service.runFirstBarrier.await();
     assertEquals(i, service.numberOfTimesRunCalled.get());
     service.runSecondBarrier.await();
     }
     service.runFirstBarrier.await();
     service.stopAsync();
     service.runSecondBarrier.await();
     service.stopAsync().awaitTerminated();
     }

    public void testExecutorOnlyCalledOnce() throws Exception {
     TestService service = new TestService();
     service.startAsync().awaitRunning();
     // It should be called once during startup.
     assertEquals(1, service.numberOfTimesExecutorCalled.get());
     for (int i = 1; i < 10; i++) {
     service.runFirstBarrier.await();
     assertEquals(i, service.numberOfTimesRunCalled.get());
     service.runSecondBarrier.await();
     }
     service.runFirstBarrier.await();
     service.stopAsync();
     service.runSecondBarrier.await();
     service.stopAsync().awaitTerminated();
     // Only called once overall.
     assertEquals(1, service.numberOfTimesExecutorCalled.get());
     }

    public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
     final CountDownLatch terminationLatch = new CountDownLatch(1);
     AbstractScheduledService service = new AbstractScheduledService() {
     volatile ScheduledExecutorService executorService;
     @Override protected void runOneIteration() throws Exception {}

    @Override protected ScheduledExecutorService executor() {
     if (executorService == null) {
     executorService = super.executor();
     // Add a listener that will be executed after the listener that shuts down the executor.
     addListener(new Listener() {
     @Override public void terminated(State from) {
     terminationLatch.countDown();
     }
     }, MoreExecutors.sameThreadExecutor());
     }
     return executorService;
     }

    @Override protected Scheduler scheduler() {
     return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
     }};

    service.startAsync();
     assertFalse(service.executor().isShutdown());
     service.awaitRunning();
     service.stopAsync();
     terminationLatch.await();
     assertTrue(service.executor().isShutdown());
     assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
     }

    public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
     final CountDownLatch failureLatch = new CountDownLatch(1);
     AbstractScheduledService service = new AbstractScheduledService() {
     volatile ScheduledExecutorService executorService;
     @Override protected void runOneIteration() throws Exception {}

    @Override protected void startUp() throws Exception {
     throw new Exception("Failed");
     }

    @Override protected ScheduledExecutorService executor() {
     if (executorService == null) {
     executorService = super.executor();
     // Add a listener that will be executed after the listener that shuts down the executor.
     addListener(new Listener() {
     @Override public void failed(State from, Throwable failure) {
     failureLatch.countDown();
     }
     }, MoreExecutors.sameThreadExecutor());
     }
     return executorService;
     }

    @Override protected Scheduler scheduler() {
     return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
     }};

    try {
     service.startAsync().awaitRunning();
     fail("Expected service to fail during startup");
     } catch (IllegalStateException expected) {}
     failureLatch.await();
     assertTrue(service.executor().isShutdown());
     assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
     }

    public void testSchedulerOnlyCalledOnce() throws Exception {
     TestService service = new TestService();
     service.startAsync().awaitRunning();
     // It should be called once during startup.
     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
     for (int i = 1; i < 10; i++) {
     service.runFirstBarrier.await();
     assertEquals(i, service.numberOfTimesRunCalled.get());
     service.runSecondBarrier.await();
     }
     service.runFirstBarrier.await();
     service.stopAsync();
     service.runSecondBarrier.await();
     service.awaitTerminated();
     // Only called once overall.
     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
     }

    private class TestService extends AbstractScheduledService {
     CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
     CyclicBarrier runSecondBarrier = new CyclicBarrier(2);

    volatile boolean startUpCalled = false;
     volatile boolean shutDownCalled = false;
     AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
     AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
     AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
     volatile Exception runException = null;
     volatile Exception startUpException = null;
     volatile Exception shutDownException = null;

    @Override
     protected void runOneIteration() throws Exception {
     assertTrue(startUpCalled);
     assertFalse(shutDownCalled);
     numberOfTimesRunCalled.incrementAndGet();
     assertEquals(State.RUNNING, state());
     runFirstBarrier.await();
     runSecondBarrier.await();
     if (runException != null) {
     throw runException;
     }
     }

    @Override
     protected void startUp() throws Exception {
     assertFalse(startUpCalled);
     assertFalse(shutDownCalled);
     startUpCalled = true;
     assertEquals(State.STARTING, state());
     if (startUpException != null) {
     throw startUpException;
     }
     }

    @Override
     protected void shutDown() throws Exception {
     assertTrue(startUpCalled);
     assertFalse(shutDownCalled);
     shutDownCalled = true;
     if (shutDownException != null) {
     throw shutDownException;
     }
     }

    @Override
     protected ScheduledExecutorService executor() {
     numberOfTimesExecutorCalled.incrementAndGet();
     return executor;
     }

    @Override
     protected Scheduler scheduler() {
     numberOfTimesSchedulerCalled.incrementAndGet();
     return configuration;
     }
     }

    public static class SchedulerTest extends TestCase {
     // These constants are arbitrary and just used to make sure that the correct method is called
     // with the correct parameters.
     private static final int initialDelay = 10;
     private static final int delay = 20;
     private static final TimeUnit unit = TimeUnit.MILLISECONDS;

    // Unique runnable object used for comparison.
     final Runnable testRunnable = new Runnable() {@Override public void run() {}};
     boolean called = false;

    private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
     long delay, TimeUnit unit) {
     assertFalse(called); // only called once.
     called = true;
     assertEquals(SchedulerTest.initialDelay, initialDelay);
     assertEquals(SchedulerTest.delay, delay);
     assertEquals(SchedulerTest.unit, unit);
     assertEquals(testRunnable, command);
     }

    public void testFixedRateSchedule() {
     Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
     schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
     @Override
     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
     long period, TimeUnit unit) {
     assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
     return null;
     }
     }, testRunnable);
     assertTrue(called);
     }

    public void testFixedDelaySchedule() {
     Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
     schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
     @Override
     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
     long delay, TimeUnit unit) {
     assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
     return null;
     }
     }, testRunnable);
     assertTrue(called);
     }

    private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
     public AtomicInteger scheduleCounter = new AtomicInteger(0);
     @Override
     protected Schedule getNextSchedule() throws Exception {
     scheduleCounter.incrementAndGet();
     return new Schedule(0, TimeUnit.SECONDS);
     }
     }

    public void testCustomSchedule_startStop() throws Exception {
     final CyclicBarrier firstBarrier = new CyclicBarrier(2);
     final CyclicBarrier secondBarrier = new CyclicBarrier(2);
     final AtomicBoolean shouldWait = new AtomicBoolean(true);
     Runnable task = new Runnable() {
     @Override public void run() {
     try {
     if (shouldWait.get()) {
     firstBarrier.await();
     secondBarrier.await();
     }
     } catch (Exception e) {
     throw new RuntimeException(e);
     }
     }
     };
     TestCustomScheduler scheduler = new TestCustomScheduler();
     Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
     firstBarrier.await();
     assertEquals(1, scheduler.scheduleCounter.get());
     secondBarrier.await();
     firstBarrier.await();
     assertEquals(2, scheduler.scheduleCounter.get());
     shouldWait.set(false);
     secondBarrier.await();
     future.cancel(false);
     }

    public void testCustomSchedulerServiceStop() throws Exception {
     TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
     service.startAsync().awaitRunning();
     service.firstBarrier.await();
     assertEquals(1, service.numIterations.get());
     service.stopAsync();
     service.secondBarrier.await();
     service.awaitTerminated();
     // Sleep for a while just to ensure that our task wasn't called again.
     Thread.sleep(unit.toMillis(3 * delay));
     assertEquals(1, service.numIterations.get());
     }

    public void testBig() throws Exception {
     TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
     @Override protected Scheduler scheduler() {
     return new AbstractScheduledService.CustomScheduler() {
     @Override
     protected Schedule getNextSchedule() throws Exception {
     // Explicitly yield to increase the probability of a pathological scheduling.
     Thread.yield();
     return new Schedule(0, TimeUnit.SECONDS);
     }
     };
     }
     };
     service.useBarriers = false;
     service.startAsync().awaitRunning();
     Thread.sleep(50);
     service.useBarriers = true;
     service.firstBarrier.await();
     int numIterations = service.numIterations.get();
     service.stopAsync();
     service.secondBarrier.await();
     service.awaitTerminated();
     assertEquals(numIterations, service.numIterations.get());
     }

    private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
     final AtomicInteger numIterations = new AtomicInteger(0);
     volatile boolean useBarriers = true;
     final CyclicBarrier firstBarrier = new CyclicBarrier(2);
     final CyclicBarrier secondBarrier = new CyclicBarrier(2);

    @Override protected void runOneIteration() throws Exception {
     numIterations.incrementAndGet();
     if (useBarriers) {
     firstBarrier.await();
     secondBarrier.await();
     }
     }

    @Override protected ScheduledExecutorService executor() {
     // use a bunch of threads so that weird overlapping schedules are more likely to happen.
     return Executors.newScheduledThreadPool(10);
     }

    @Override protected void startUp() throws Exception {}

    @Override protected void shutDown() throws Exception {}

    @Override protected Scheduler scheduler() {
     return new CustomScheduler() {
     @Override
     protected Schedule getNextSchedule() throws Exception {
     return new Schedule(delay, unit);
     }};
     }
     }

    public void testCustomSchedulerFailure() throws Exception {
     TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
     service.startAsync().awaitRunning();
     for (int i = 1; i < 4; i++) {
     service.firstBarrier.await();
     assertEquals(i, service.numIterations.get());
     service.secondBarrier.await();
     }
     Thread.sleep(1000);
     try {
     service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
     fail();
     } catch (IllegalStateException e) {
     assertEquals(State.FAILED, service.state());
     }
     }

    private static class TestFailingCustomScheduledService extends AbstractScheduledService {
     final AtomicInteger numIterations = new AtomicInteger(0);
     final CyclicBarrier firstBarrier = new CyclicBarrier(2);
     final CyclicBarrier secondBarrier = new CyclicBarrier(2);

    @Override protected void runOneIteration() throws Exception {
     numIterations.incrementAndGet();
     firstBarrier.await();
     secondBarrier.await();
     }

    @Override protected ScheduledExecutorService executor() {
     // use a bunch of threads so that weird overlapping schedules are more likely to happen.
     return Executors.newScheduledThreadPool(10);
     }

    @Override protected Scheduler scheduler() {
     return new CustomScheduler() {
     @Override
     protected Schedule getNextSchedule() throws Exception {
     if (numIterations.get() > 2) {
     throw new IllegalStateException("Failed");
     }
     return new Schedule(delay, unit);
     }};
     }
     }
     }
    }

AbstractServiceTest

    /*
     * Copyright (C) 2009 The Guava Authors
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */

    package com.google.common.util.concurrent;

    import static java.lang.Thread.currentThread;
    import static java.util.concurrent.TimeUnit.SECONDS;

    import com.google.common.collect.ImmutableList;
    import com.google.common.collect.Iterables;
    import com.google.common.collect.Lists;
    import com.google.common.util.concurrent.Service.Listener;
    import com.google.common.util.concurrent.Service.State;

    import junit.framework.TestCase;

    import java.lang.Thread.UncaughtExceptionHandler;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicReference;

    import javax.annotation.concurrent.GuardedBy;

    /**
     * Unit test for {@link AbstractService}.
     *
     * @author Jesse Wilson
     */
    public class AbstractServiceTest extends TestCase {

    private Thread executionThread;
     private Throwable thrownByExecutionThread;

    public void testNoOpServiceStartStop() throws Exception {
     NoOpService service = new NoOpService();
     RecordingListener listener = RecordingListener.record(service);

    assertEquals(State.NEW, service.state());
     assertFalse(service.isRunning());
     assertFalse(service.running);

    service.startAsync();
     assertEquals(State.RUNNING, service.state());
     assertTrue(service.isRunning());
     assertTrue(service.running);

    service.stopAsync();
     assertEquals(State.TERMINATED, service.state());
     assertFalse(service.isRunning());
     assertFalse(service.running);
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.RUNNING,
     State.STOPPING,
     State.TERMINATED),
     listener.getStateHistory());
     }

    public void testNoOpServiceStartAndWaitStopAndWait() throws Exception {
     NoOpService service = new NoOpService();

    service.startAsync().awaitRunning();
     assertEquals(State.RUNNING, service.state());

    service.stopAsync().awaitTerminated();
     assertEquals(State.TERMINATED, service.state());
     }

    public void testNoOpServiceStartAsyncAndAwaitStopAsyncAndAwait() throws Exception {
     NoOpService service = new NoOpService();

    service.startAsync().awaitRunning();
     assertEquals(State.RUNNING, service.state());

    service.stopAsync().awaitTerminated();
     assertEquals(State.TERMINATED, service.state());
     }

    public void testNoOpServiceStopIdempotence() throws Exception {
     NoOpService service = new NoOpService();
     RecordingListener listener = RecordingListener.record(service);
     service.startAsync().awaitRunning();
     assertEquals(State.RUNNING, service.state());

    service.stopAsync();
     service.stopAsync();
     assertEquals(State.TERMINATED, service.state());
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.RUNNING,
     State.STOPPING,
     State.TERMINATED),
     listener.getStateHistory());
     }

    public void testNoOpServiceStopIdempotenceAfterWait() throws Exception {
     NoOpService service = new NoOpService();

    service.startAsync().awaitRunning();

    service.stopAsync().awaitTerminated();
     service.stopAsync();
     assertEquals(State.TERMINATED, service.state());
     }

    public void testNoOpServiceStopIdempotenceDoubleWait() throws Exception {
     NoOpService service = new NoOpService();

    service.startAsync().awaitRunning();
     assertEquals(State.RUNNING, service.state());

    service.stopAsync().awaitTerminated();
     service.stopAsync().awaitTerminated();
     assertEquals(State.TERMINATED, service.state());
     }

    public void testNoOpServiceStartStopAndWaitUninterruptible()
     throws Exception {
     NoOpService service = new NoOpService();

    currentThread().interrupt();
     try {
     service.startAsync().awaitRunning();
     assertEquals(State.RUNNING, service.state());

    service.stopAsync().awaitTerminated();
     assertEquals(State.TERMINATED, service.state());

    assertTrue(currentThread().isInterrupted());
     } finally {
     Thread.interrupted(); // clear interrupt for future tests
     }
     }

    private static class NoOpService extends AbstractService {
     boolean running = false;

    @Override protected void doStart() {
     assertFalse(running);
     running = true;
     notifyStarted();
     }

    @Override protected void doStop() {
     assertTrue(running);
     running = false;
     notifyStopped();
     }
     }

    public void testManualServiceStartStop() throws Exception {
     ManualSwitchedService service = new ManualSwitchedService();
     RecordingListener listener = RecordingListener.record(service);

    service.startAsync();
     assertEquals(State.STARTING, service.state());
     assertFalse(service.isRunning());
     assertTrue(service.doStartCalled);

    service.notifyStarted(); // usually this would be invoked by another thread
     assertEquals(State.RUNNING, service.state());
     assertTrue(service.isRunning());

    service.stopAsync();
     assertEquals(State.STOPPING, service.state());
     assertFalse(service.isRunning());
     assertTrue(service.doStopCalled);

    service.notifyStopped(); // usually this would be invoked by another thread
     assertEquals(State.TERMINATED, service.state());
     assertFalse(service.isRunning());
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.RUNNING,
     State.STOPPING,
     State.TERMINATED),
     listener.getStateHistory());

    }

    public void testManualServiceNotifyStoppedWhileRunning() throws Exception {
     ManualSwitchedService service = new ManualSwitchedService();
     RecordingListener listener = RecordingListener.record(service);

    service.startAsync();
     service.notifyStarted();
     service.notifyStopped();
     assertEquals(State.TERMINATED, service.state());
     assertFalse(service.isRunning());
     assertFalse(service.doStopCalled);

    assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.RUNNING,
     State.TERMINATED),
     listener.getStateHistory());
     }

    public void testManualServiceStopWhileStarting() throws Exception {
     ManualSwitchedService service = new ManualSwitchedService();
     RecordingListener listener = RecordingListener.record(service);

    service.startAsync();
     assertEquals(State.STARTING, service.state());
     assertFalse(service.isRunning());
     assertTrue(service.doStartCalled);

    service.stopAsync();
     assertEquals(State.STOPPING, service.state());
     assertFalse(service.isRunning());
     assertFalse(service.doStopCalled);

    service.notifyStarted();
     assertEquals(State.STOPPING, service.state());
     assertFalse(service.isRunning());
     assertTrue(service.doStopCalled);

    service.notifyStopped();
     assertEquals(State.TERMINATED, service.state());
     assertFalse(service.isRunning());
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.STOPPING,
     State.TERMINATED),
     listener.getStateHistory());
     }

    /**
     * This tests for a bug where if {@link Service#stopAsync()} was called while the service was
     * {@link State#STARTING} more than once, the {@link Listener#stopping(State)} callback would get
     * called multiple times.
     */
     public void testManualServiceStopMultipleTimesWhileStarting() throws Exception {
     ManualSwitchedService service = new ManualSwitchedService();
     final AtomicInteger stopppingCount = new AtomicInteger();
     service.addListener(new Listener() {
     @Override public void stopping(State from) {
     stopppingCount.incrementAndGet();
     }
     }, MoreExecutors.sameThreadExecutor());

    service.startAsync();
     service.stopAsync();
     assertEquals(1, stopppingCount.get());
     service.stopAsync();
     assertEquals(1, stopppingCount.get());
     }

    public void testManualServiceStopWhileNew() throws Exception {
     ManualSwitchedService service = new ManualSwitchedService();
     RecordingListener listener = RecordingListener.record(service);

    service.stopAsync();
     assertEquals(State.TERMINATED, service.state());
     assertFalse(service.isRunning());
     assertFalse(service.doStartCalled);
     assertFalse(service.doStopCalled);
     assertEquals(ImmutableList.of(State.TERMINATED), listener.getStateHistory());
     }

    public void testManualServiceFailWhileStarting() throws Exception {
     ManualSwitchedService service = new ManualSwitchedService();
     RecordingListener listener = RecordingListener.record(service);
     service.startAsync();
     service.notifyFailed(EXCEPTION);
     assertEquals(ImmutableList.of(State.STARTING, State.FAILED), listener.getStateHistory());
     }

    public void testManualServiceFailWhileRunning() throws Exception {
     ManualSwitchedService service = new ManualSwitchedService();
     RecordingListener listener = RecordingListener.record(service);
     service.startAsync();
     service.notifyStarted();
     service.notifyFailed(EXCEPTION);
     assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.FAILED),
     listener.getStateHistory());
     }

    public void testManualServiceFailWhileStopping() throws Exception {
     ManualSwitchedService service = new ManualSwitchedService();
     RecordingListener listener = RecordingListener.record(service);
     service.startAsync();
     service.notifyStarted();
     service.stopAsync();
     service.notifyFailed(EXCEPTION);
     assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.STOPPING, State.FAILED),
     listener.getStateHistory());
     }

    public void testManualServiceUnrequestedStop() {
     ManualSwitchedService service = new ManualSwitchedService();

    service.startAsync();

    service.notifyStarted();
     assertEquals(State.RUNNING, service.state());
     assertTrue(service.isRunning());
     assertFalse(service.doStopCalled);

    service.notifyStopped();
     assertEquals(State.TERMINATED, service.state());
     assertFalse(service.isRunning());
     assertFalse(service.doStopCalled);
     }

    /**
     * The user of this service should call {@link #notifyStarted} and {@link
     * #notifyStopped} after calling {@link #startAsync} and {@link #stopAsync}.
     */
     private static class ManualSwitchedService extends AbstractService {
     boolean doStartCalled = false;
     boolean doStopCalled = false;

    @Override protected void doStart() {
     assertFalse(doStartCalled);
     doStartCalled = true;
     }

    @Override protected void doStop() {
     assertFalse(doStopCalled);
     doStopCalled = true;
     }
     }

    public void testAwaitTerminated() throws Exception {
     final NoOpService service = new NoOpService();
     Thread waiter = new Thread() {
     @Override public void run() {
     service.awaitTerminated();
     }
     };
     waiter.start();
     service.startAsync().awaitRunning();
     assertEquals(State.RUNNING, service.state());
     service.stopAsync();
     waiter.join(100); // ensure that the await in the other thread is triggered
     assertFalse(waiter.isAlive());
     }

    public void testAwaitTerminated_FailedService() throws Exception {
     final ManualSwitchedService service = new ManualSwitchedService();
     final AtomicReference<Throwable> exception = Atomics.newReference();
     Thread waiter = new Thread() {
     @Override public void run() {
     try {
     service.awaitTerminated();
     fail("Expected an IllegalStateException");
     } catch (Throwable t) {
     exception.set(t);
     }
     }
     };
     waiter.start();
     service.startAsync();
     service.notifyStarted();
     assertEquals(State.RUNNING, service.state());
     service.notifyFailed(EXCEPTION);
     assertEquals(State.FAILED, service.state());
     waiter.join(100);
     assertFalse(waiter.isAlive());
     assertTrue(exception.get() instanceof IllegalStateException);
     assertEquals(EXCEPTION, exception.get().getCause());
     }

    public void testThreadedServiceStartAndWaitStopAndWait() throws Throwable {
     ThreadedService service = new ThreadedService();
     RecordingListener listener = RecordingListener.record(service);
     service.startAsync().awaitRunning();
     assertEquals(State.RUNNING, service.state());

    service.awaitRunChecks();

    service.stopAsync().awaitTerminated();
     assertEquals(State.TERMINATED, service.state());

    throwIfSet(thrownByExecutionThread);
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.RUNNING,
     State.STOPPING,
     State.TERMINATED),
     listener.getStateHistory());
     }

    public void testThreadedServiceStopIdempotence() throws Throwable {
     ThreadedService service = new ThreadedService();

    service.startAsync().awaitRunning();
     assertEquals(State.RUNNING, service.state());

    service.awaitRunChecks();

    service.stopAsync();
     service.stopAsync().awaitTerminated();
     assertEquals(State.TERMINATED, service.state());

    throwIfSet(thrownByExecutionThread);
     }

    public void testThreadedServiceStopIdempotenceAfterWait()
     throws Throwable {
     ThreadedService service = new ThreadedService();

    service.startAsync().awaitRunning();
     assertEquals(State.RUNNING, service.state());

    service.awaitRunChecks();

    service.stopAsync().awaitTerminated();
     service.stopAsync();
     assertEquals(State.TERMINATED, service.state());

    executionThread.join();

    throwIfSet(thrownByExecutionThread);
     }

    public void testThreadedServiceStopIdempotenceDoubleWait()
     throws Throwable {
     ThreadedService service = new ThreadedService();

    service.startAsync().awaitRunning();
     assertEquals(State.RUNNING, service.state());

    service.awaitRunChecks();

    service.stopAsync().awaitTerminated();
     service.stopAsync().awaitTerminated();
     assertEquals(State.TERMINATED, service.state());

    throwIfSet(thrownByExecutionThread);
     }

    public void testManualServiceFailureIdempotence() {
     ManualSwitchedService service = new ManualSwitchedService();
     RecordingListener.record(service);
     service.startAsync();
     service.notifyFailed(new Exception("1"));
     service.notifyFailed(new Exception("2"));
     assertEquals("1", service.failureCause().getMessage());
     try {
     service.awaitRunning();
     fail();
     } catch (IllegalStateException e) {
     assertEquals("1", e.getCause().getMessage());
     }
     }

    private class ThreadedService extends AbstractService {
     final CountDownLatch hasConfirmedIsRunning = new CountDownLatch(1);

    /*
     * The main test thread tries to stop() the service shortly after
     * confirming that it is running. Meanwhile, the service itself is trying
     * to confirm that it is running. If the main thread's stop() call happens
     * before it has the chance, the test will fail. To avoid this, the main
     * thread calls this method, which waits until the service has performed
     * its own "running" check.
     */
     void awaitRunChecks() throws InterruptedException {
     assertTrue("Service thread hasn't finished its checks. "
     + "Exception status (possibly stale): " + thrownByExecutionThread,
     hasConfirmedIsRunning.await(10, SECONDS));
     }

    @Override protected void doStart() {
     assertEquals(State.STARTING, state());
     invokeOnExecutionThreadForTest(new Runnable() {
     @Override public void run() {
     assertEquals(State.STARTING, state());
     notifyStarted();
     assertEquals(State.RUNNING, state());
     hasConfirmedIsRunning.countDown();
     }
     });
     }

    @Override protected void doStop() {
     assertEquals(State.STOPPING, state());
     invokeOnExecutionThreadForTest(new Runnable() {
     @Override public void run() {
     assertEquals(State.STOPPING, state());
     notifyStopped();
     assertEquals(State.TERMINATED, state());
     }
     });
     }
     }

    private void invokeOnExecutionThreadForTest(Runnable runnable) {
     executionThread = new Thread(runnable);
     executionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
     @Override
     public void uncaughtException(Thread thread, Throwable e) {
     thrownByExecutionThread = e;
     }
     });
     executionThread.start();
     }

    private static void throwIfSet(Throwable t) throws Throwable {
     if (t != null) {
     throw t;
     }
     }

    public void testStopUnstartedService() throws Exception {
     NoOpService service = new NoOpService();
     RecordingListener listener = RecordingListener.record(service);

    service.stopAsync();
     assertEquals(State.TERMINATED, service.state());

    try {
     service.startAsync();
     fail();
     } catch (IllegalStateException expected) {}
     assertEquals(State.TERMINATED, Iterables.getOnlyElement(listener.getStateHistory()));
     }

    public void testFailingServiceStartAndWait() throws Exception {
     StartFailingService service = new StartFailingService();
     RecordingListener listener = RecordingListener.record(service);

    try {
     service.startAsync().awaitRunning();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(EXCEPTION, service.failureCause());
     assertEquals(EXCEPTION, e.getCause());
     }
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.FAILED),
     listener.getStateHistory());
     }

    public void testFailingServiceStopAndWait_stopFailing() throws Exception {
     StopFailingService service = new StopFailingService();
     RecordingListener listener = RecordingListener.record(service);

    service.startAsync().awaitRunning();
     try {
     service.stopAsync().awaitTerminated();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(EXCEPTION, service.failureCause());
     assertEquals(EXCEPTION, e.getCause());
     }
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.RUNNING,
     State.STOPPING,
     State.FAILED),
     listener.getStateHistory());
     }

    public void testFailingServiceStopAndWait_runFailing() throws Exception {
     RunFailingService service = new RunFailingService();
     RecordingListener listener = RecordingListener.record(service);

    service.startAsync();
     try {
     service.awaitRunning();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(EXCEPTION, service.failureCause());
     assertEquals(EXCEPTION, e.getCause());
     }
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.RUNNING,
     State.FAILED),
     listener.getStateHistory());
     }

    public void testThrowingServiceStartAndWait() throws Exception {
     StartThrowingService service = new StartThrowingService();
     RecordingListener listener = RecordingListener.record(service);

    try {
     service.startAsync().awaitRunning();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(service.exception, service.failureCause());
     assertEquals(service.exception, e.getCause());
     }
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.FAILED),
     listener.getStateHistory());
     }

    public void testThrowingServiceStopAndWait_stopThrowing() throws Exception {
     StopThrowingService service = new StopThrowingService();
     RecordingListener listener = RecordingListener.record(service);

    service.startAsync().awaitRunning();
     try {
     service.stopAsync().awaitTerminated();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(service.exception, service.failureCause());
     assertEquals(service.exception, e.getCause());
     }
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.RUNNING,
     State.STOPPING,
     State.FAILED),
     listener.getStateHistory());
     }

    public void testThrowingServiceStopAndWait_runThrowing() throws Exception {
     RunThrowingService service = new RunThrowingService();
     RecordingListener listener = RecordingListener.record(service);

    service.startAsync();
     try {
     service.awaitTerminated();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(service.exception, service.failureCause());
     assertEquals(service.exception, e.getCause());
     }
     assertEquals(
     ImmutableList.of(
     State.STARTING,
     State.RUNNING,
     State.FAILED),
     listener.getStateHistory());
     }

    public void testFailureCause_throwsIfNotFailed() {
     StopFailingService service = new StopFailingService();
     try {
     service.failureCause();
     fail();
     } catch (IllegalStateException e) {
     // expected
     }
     service.startAsync().awaitRunning();
     try {
     service.failureCause();
     fail();
     } catch (IllegalStateException e) {
     // expected
     }
     try {
     service.stopAsync().awaitTerminated();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(EXCEPTION, service.failureCause());
     assertEquals(EXCEPTION, e.getCause());
     }
     }

    public void testAddListenerAfterFailureDoesntCauseDeadlock() throws InterruptedException {
     final StartFailingService service = new StartFailingService();
     service.startAsync();
     assertEquals(State.FAILED, service.state());
     service.addListener(new RecordingListener(service), MoreExecutors.sameThreadExecutor());
     Thread thread = new Thread() {
     @Override public void run() {
     // Internally stopAsync() grabs a lock, this could be any such method on AbstractService.
     service.stopAsync();
     }
     };
     thread.start();
     thread.join(100);
     assertFalse(thread + " is deadlocked", thread.isAlive());
     }

    public void testListenerDoesntDeadlockOnStartAndWaitFromRunning() throws Exception {
     final NoOpThreadedService service = new NoOpThreadedService();
     service.addListener(new Listener() {
     @Override public void running() {
     service.awaitRunning();
     }
     }, MoreExecutors.sameThreadExecutor());
     service.startAsync().awaitRunning(10, TimeUnit.MILLISECONDS);
     service.stopAsync();
     }

    public void testListenerDoesntDeadlockOnStopAndWaitFromTerminated() throws Exception {
     final NoOpThreadedService service = new NoOpThreadedService();
     service.addListener(new Listener() {
     @Override public void terminated(State from) {
     service.stopAsync().awaitTerminated();
     }
     }, MoreExecutors.sameThreadExecutor());
     service.startAsync().awaitRunning();

    Thread thread = new Thread() {
     @Override public void run() {
     service.stopAsync().awaitTerminated();
     }
     };
     thread.start();
     thread.join(100);
     assertFalse(thread + " is deadlocked", thread.isAlive());
     }

    private static class NoOpThreadedService extends AbstractExecutionThreadService {
     final CountDownLatch latch = new CountDownLatch(1);
     @Override protected void run() throws Exception {
     latch.await();
     }
     @Override protected void triggerShutdown() {
     latch.countDown();
     }
     }

    private static class StartFailingService extends AbstractService {
     @Override protected void doStart() {
     notifyFailed(EXCEPTION);
     }

    @Override protected void doStop() {
     fail();
     }
     }

    private static class RunFailingService extends AbstractService {
     @Override protected void doStart() {
     notifyStarted();
     notifyFailed(EXCEPTION);
     }

    @Override protected void doStop() {
     fail();
     }
     }

    private static class StopFailingService extends AbstractService {
     @Override protected void doStart() {
     notifyStarted();
     }

    @Override protected void doStop() {
     notifyFailed(EXCEPTION);
     }
     }

    private static class StartThrowingService extends AbstractService {

    final RuntimeException exception = new RuntimeException("deliberate");

    @Override protected void doStart() {
     throw exception;
     }

    @Override protected void doStop() {
     fail();
     }
     }

    private static class RunThrowingService extends AbstractService {

    final RuntimeException exception = new RuntimeException("deliberate");

    @Override protected void doStart() {
     notifyStarted();
     throw exception;
     }

    @Override protected void doStop() {
     fail();
     }
     }

    private static class StopThrowingService extends AbstractService {

    final RuntimeException exception = new RuntimeException("deliberate");

    @Override protected void doStart() {
     notifyStarted();
     }

    @Override protected void doStop() {
     throw exception;
     }
     }

    private static class RecordingListener extends Listener {
     static RecordingListener record(Service service) {
     RecordingListener listener = new RecordingListener(service);
     service.addListener(listener, MoreExecutors.sameThreadExecutor());
     return listener;
     }

    final Service service;

    RecordingListener(Service service) {
     this.service = service;
     }

    @GuardedBy("this")
     final List<State> stateHistory = Lists.newArrayList();
     final CountDownLatch completionLatch = new CountDownLatch(1);

    ImmutableList<State> getStateHistory() throws Exception {
     completionLatch.await();
     synchronized (this) {
     return ImmutableList.copyOf(stateHistory);
     }
     }

    @Override public synchronized void starting() {
     assertTrue(stateHistory.isEmpty());
     assertNotSame(State.NEW, service.state());
     stateHistory.add(State.STARTING);
     }

    @Override public synchronized void running() {
     assertEquals(State.STARTING, Iterables.getOnlyElement(stateHistory));
     stateHistory.add(State.RUNNING);
     service.awaitRunning();
     assertNotSame(State.STARTING, service.state());
     }

    @Override public synchronized void stopping(State from) {
     assertEquals(from, Iterables.getLast(stateHistory));
     stateHistory.add(State.STOPPING);
     if (from == State.STARTING) {
     try {
     service.awaitRunning();
     fail();
     } catch (IllegalStateException expected) {
     assertNull(expected.getCause());
     assertTrue(expected.getMessage().equals(
     "Expected the service to be RUNNING, but was STOPPING"));
     }
     }
     assertNotSame(from, service.state());
     }

    @Override public synchronized void terminated(State from) {
     assertEquals(from, Iterables.getLast(stateHistory, State.NEW));
     stateHistory.add(State.TERMINATED);
     assertEquals(State.TERMINATED, service.state());
     if (from == State.NEW) {
     try {
     service.awaitRunning();
     fail();
     } catch (IllegalStateException expected) {
     assertNull(expected.getCause());
     assertTrue(expected.getMessage().equals(
     "Expected the service to be RUNNING, but was TERMINATED"));
     }
     }
     completionLatch.countDown();
     }

    @Override public synchronized void failed(State from, Throwable failure) {
     assertEquals(from, Iterables.getLast(stateHistory));
     stateHistory.add(State.FAILED);
     assertEquals(State.FAILED, service.state());
     assertEquals(failure, service.failureCause());
     if (from == State.STARTING) {
     try {
     service.awaitRunning();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(failure, e.getCause());
     }
     }
     try {
     service.awaitTerminated();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(failure, e.getCause());
     }
     completionLatch.countDown();
     }
     }

    public void testNotifyStartedWhenNotStarting() {
     AbstractService service = new DefaultService();
     try {
     service.notifyStarted();
     fail();
     } catch (IllegalStateException expected) {}
     }

    public void testNotifyStoppedWhenNotRunning() {
     AbstractService service = new DefaultService();
     try {
     service.notifyStopped();
     fail();
     } catch (IllegalStateException expected) {}
     }

    public void testNotifyFailedWhenNotStarted() {
     AbstractService service = new DefaultService();
     try {
     service.notifyFailed(new Exception());
     fail();
     } catch (IllegalStateException expected) {}
     }

    public void testNotifyFailedWhenTerminated() {
     NoOpService service = new NoOpService();
     service.startAsync().awaitRunning();
     service.stopAsync().awaitTerminated();
     try {
     service.notifyFailed(new Exception());
     fail();
     } catch (IllegalStateException expected) {}
     }

    private static class DefaultService extends AbstractService {
     @Override protected void doStart() {}
     @Override protected void doStop() {}
     }

    private static final Exception EXCEPTION = new Exception();
    }

ServiceManagerTest

    /*
     * Copyright (C) 2012 The Guava Authors
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.google.common.util.concurrent;

    import static java.util.Arrays.asList;

    import com.google.common.collect.ImmutableMap;
    import com.google.common.collect.ImmutableSet;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Sets;
    import com.google.common.testing.NullPointerTester;
    import com.google.common.testing.TestLogHandler;
    import com.google.common.util.concurrent.ServiceManager.Listener;

    import junit.framework.TestCase;

    import java.util.Arrays;
    import java.util.Collection;
    import java.util.List;
    import java.util.Set;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    import java.util.logging.Formatter;
    import java.util.logging.Level;
    import java.util.logging.LogRecord;
    import java.util.logging.Logger;

    /**
     * Tests for {@link ServiceManager}.
     *
     * @author Luke Sandberg
     * @author Chris Nokleberg
     */
    public class ServiceManagerTest extends TestCase {

    private static class NoOpService extends AbstractService {
     @Override protected void doStart() {
     notifyStarted();
     }

    @Override protected void doStop() {
     notifyStopped();
     }
     }

    /*
     * A NoOp service that will delay the startup and shutdown notification for a configurable amount
     * of time.
     */
     private static class NoOpDelayedSerivce extends NoOpService {
     private long delay;

    public NoOpDelayedSerivce(long delay) {
     this.delay = delay;
     }

    @Override protected void doStart() {
     new Thread() {
     @Override public void run() {
     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
     notifyStarted();
     }
     }.start();
     }

    @Override protected void doStop() {
     new Thread() {
     @Override public void run() {
     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
     notifyStopped();
     }
     }.start();
     }
     }

    private static class FailStartService extends NoOpService {
     @Override protected void doStart() {
     notifyFailed(new IllegalStateException("failed"));
     }
     }

    private static class FailRunService extends NoOpService {
     @Override protected void doStart() {
     super.doStart();
     notifyFailed(new IllegalStateException("failed"));
     }
     }

    private static class FailStopService extends NoOpService {
     @Override protected void doStop() {
     notifyFailed(new IllegalStateException("failed"));
     }
     }

    public void testServiceStartupTimes() {
     Service a = new NoOpDelayedSerivce(150);
     Service b = new NoOpDelayedSerivce(353);
     ServiceManager serviceManager = new ServiceManager(asList(a, b));
     serviceManager.startAsync().awaitHealthy();
     ImmutableMap<Service, Long> startupTimes = serviceManager.startupTimes();
     assertEquals(2, startupTimes.size());
     assertTrue(startupTimes.get(a) >= 150);
     assertTrue(startupTimes.get(b) >= 353);
     }

    public void testServiceStartStop() {
     Service a = new NoOpService();
     Service b = new NoOpService();
     ServiceManager manager = new ServiceManager(asList(a, b));
     RecordingListener listener = new RecordingListener();
     manager.addListener(listener);
     assertState(manager, Service.State.NEW, a, b);
     assertFalse(manager.isHealthy());
     manager.startAsync().awaitHealthy();
     assertState(manager, Service.State.RUNNING, a, b);
     assertTrue(manager.isHealthy());
     assertTrue(listener.healthyCalled);
     assertFalse(listener.stoppedCalled);
     assertTrue(listener.failedServices.isEmpty());
     manager.stopAsync().awaitStopped();
     assertState(manager, Service.State.TERMINATED, a, b);
     assertFalse(manager.isHealthy());
     assertTrue(listener.stoppedCalled);
     assertTrue(listener.failedServices.isEmpty());
     }

    public void testFailStart() throws Exception {
     Service a = new NoOpService();
     Service b = new FailStartService();
     Service c = new NoOpService();
     Service d = new FailStartService();
     Service e = new NoOpService();
     ServiceManager manager = new ServiceManager(asList(a, b, c, d, e));
     RecordingListener listener = new RecordingListener();
     manager.addListener(listener);
     assertState(manager, Service.State.NEW, a, b, c, d, e);
     try {
     manager.startAsync().awaitHealthy();
     fail();
     } catch (IllegalStateException expected) {
     }
     assertFalse(listener.healthyCalled);
     assertState(manager, Service.State.RUNNING, a, c, e);
     assertEquals(ImmutableSet.of(b, d), listener.failedServices);
     assertState(manager, Service.State.FAILED, b, d);
     assertFalse(manager.isHealthy());

    manager.stopAsync().awaitStopped();
     assertFalse(manager.isHealthy());
     assertFalse(listener.healthyCalled);
     assertTrue(listener.stoppedCalled);
     }

    public void testFailRun() throws Exception {
     Service a = new NoOpService();
     Service b = new FailRunService();
     ServiceManager manager = new ServiceManager(asList(a, b));
     RecordingListener listener = new RecordingListener();
     manager.addListener(listener);
     assertState(manager, Service.State.NEW, a, b);
     try {
     manager.startAsync().awaitHealthy();
     fail();
     } catch (IllegalStateException expected) {
     }
     assertTrue(listener.healthyCalled);
     assertEquals(ImmutableSet.of(b), listener.failedServices);

    manager.stopAsync().awaitStopped();
     assertState(manager, Service.State.FAILED, b);
     assertState(manager, Service.State.TERMINATED, a);

    assertTrue(listener.stoppedCalled);
     }

    public void testFailStop() throws Exception {
     Service a = new NoOpService();
     Service b = new FailStopService();
     Service c = new NoOpService();
     ServiceManager manager = new ServiceManager(asList(a, b, c));
     RecordingListener listener = new RecordingListener();
     manager.addListener(listener);

    manager.startAsync().awaitHealthy();
     assertTrue(listener.healthyCalled);
     assertFalse(listener.stoppedCalled);
     manager.stopAsync().awaitStopped();

    assertTrue(listener.stoppedCalled);
     assertEquals(ImmutableSet.of(b), listener.failedServices);
     assertState(manager, Service.State.FAILED, b);
     assertState(manager, Service.State.TERMINATED, a, c);
     }

    public void testToString() throws Exception {
     Service a = new NoOpService();
     Service b = new FailStartService();
     ServiceManager manager = new ServiceManager(asList(a, b));
     String toString = manager.toString();
     assertTrue(toString.contains("NoOpService"));
     assertTrue(toString.contains("FailStartService"));
     }

    public void testTimeouts() throws Exception {
     Service a = new NoOpDelayedSerivce(50);
     ServiceManager manager = new ServiceManager(asList(a));
     manager.startAsync();
     try {
     manager.awaitHealthy(1, TimeUnit.MILLISECONDS);
     fail();
     } catch (TimeoutException expected) {
     }
     manager.awaitHealthy(100, TimeUnit.MILLISECONDS); // no exception thrown

    manager.stopAsync();
     try {
     manager.awaitStopped(1, TimeUnit.MILLISECONDS);
     fail();
     } catch (TimeoutException expected) {
     }
     manager.awaitStopped(100, TimeUnit.MILLISECONDS); // no exception thrown
     }

    /**
     * This covers a case where if the last service to stop failed then the stopped callback would
     * never be called.
     */
     public void testSingleFailedServiceCallsStopped() {
     Service a = new FailStartService();
     ServiceManager manager = new ServiceManager(asList(a));
     RecordingListener listener = new RecordingListener();
     manager.addListener(listener);
     try {
     manager.startAsync().awaitHealthy();
     fail();
     } catch (IllegalStateException expected) {
     }
     assertTrue(listener.stoppedCalled);
     }

    /**
     * This covers a bug where listener.healthy would get called when a single service failed during
     * startup (it occurred in more complicated cases also).
     */
     public void testFailStart_singleServiceCallsHealthy() {
     Service a = new FailStartService();
     ServiceManager manager = new ServiceManager(asList(a));
     RecordingListener listener = new RecordingListener();
     manager.addListener(listener);
     try {
     manager.startAsync().awaitHealthy();
     fail();
     } catch (IllegalStateException expected) {
     }
     assertFalse(listener.healthyCalled);
     }

    /**
     * This covers a bug where if a listener was installed that would stop the manager if any service
     * fails and something failed during startup before service.start was called on all the services,
     * then awaitStopped would deadlock due to an IllegalStateException that was thrown when trying to
     * stop the timer(!).
     */
     public void testFailStart_stopOthers() throws TimeoutException {
     Service a = new FailStartService();
     Service b = new NoOpService();
     final ServiceManager manager = new ServiceManager(asList(a, b));
     manager.addListener(new Listener() {
     @Override public void failure(Service service) {
     manager.stopAsync();
     }});
     manager.startAsync();
     manager.awaitStopped(10, TimeUnit.MILLISECONDS);
     }

    private static void assertState(
     ServiceManager manager, Service.State state, Service... services) {
     Collection<Service> managerServices = manager.servicesByState().get(state);
     for (Service service : services) {
     assertEquals(service.toString(), state, service.state());
     assertEquals(service.toString(), service.isRunning(), state == Service.State.RUNNING);
     assertTrue(managerServices + " should contain " + service, managerServices.contains(service));
     }
     }

    /**
     * This is for covering a case where the ServiceManager would behave strangely if constructed
     * with no service under management. Listeners would never fire because the ServiceManager was
     * healthy and stopped at the same time. This test ensures that listeners fire and isHealthy
     * makes sense.
     */
     public void testEmptyServiceManager() {
     Logger logger = Logger.getLogger(ServiceManager.class.getName());
     logger.setLevel(Level.FINEST);
     TestLogHandler logHandler = new TestLogHandler();
     logger.addHandler(logHandler);
     ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
     RecordingListener listener = new RecordingListener();
     manager.addListener(listener, MoreExecutors.sameThreadExecutor());
     manager.startAsync().awaitHealthy();
     assertTrue(manager.isHealthy());
     assertTrue(listener.healthyCalled);
     assertFalse(listener.stoppedCalled);
     assertTrue(listener.failedServices.isEmpty());
     manager.stopAsync().awaitStopped();
     assertFalse(manager.isHealthy());
     assertTrue(listener.stoppedCalled);
     assertTrue(listener.failedServices.isEmpty());
     // check that our NoOpService is not directly observable via any of the inspection methods or
     // via logging.
     assertEquals("ServiceManager{services=[]}", manager.toString());
     assertTrue(manager.servicesByState().isEmpty());
     assertTrue(manager.startupTimes().isEmpty());
     Formatter logFormatter = new Formatter() {
     @Override public String format(LogRecord record) {
     return formatMessage(record);
     }
     };
     for (LogRecord record : logHandler.getStoredLogRecords()) {
     assertFalse(logFormatter.format(record).contains("NoOpService"));
     }
     }

    /**
     * This is for a case where a long running Listener using the sameThreadListener could deadlock
     * another thread calling stopAsync().
     */

    public void testListenerDeadlock() throws InterruptedException {
     final CountDownLatch failEnter = new CountDownLatch(1);
     Service failRunService = new AbstractService() {
     @Override protected void doStart() {
     new Thread() {
     @Override public void run() {
     notifyStarted();
     notifyFailed(new Exception("boom"));
     }
     }.start();
     }
     @Override protected void doStop() {
     notifyStopped();
     }
     };
     final ServiceManager manager = new ServiceManager(
     Arrays.asList(failRunService, new NoOpService()));
     manager.addListener(new ServiceManager.Listener() {
     @Override public void failure(Service service) {
     failEnter.countDown();
     // block forever!
     Uninterruptibles.awaitUninterruptibly(new CountDownLatch(1));
     }
     }, MoreExecutors.sameThreadExecutor());
     // We do not call awaitHealthy because, due to races, that method may throw an exception. But
     // we really just want to wait for the thread to be in the failure callback so we wait for that
     // explicitly instead.
     manager.startAsync();
     failEnter.await();
     assertFalse("State should be updated before calling listeners", manager.isHealthy());
     // now we want to stop the services.
     Thread stoppingThread = new Thread() {
     @Override public void run() {
     manager.stopAsync().awaitStopped();
     }
     };
     stoppingThread.start();
     // this should be super fast since the only non stopped service is a NoOpService
     stoppingThread.join(1000);
     assertFalse("stopAsync has deadlocked!.", stoppingThread.isAlive());
     }

    /**
     * Catches a bug where when constructing a service manager failed, later interactions with the
     * service could cause IllegalStateExceptions inside the partially constructed ServiceManager.
     * This ISE wouldn't actually bubble up but would get logged by ExecutionQueue. This obfuscated
     * the original error (which was not constructing ServiceManager correctly).
     */
     public void testPartiallyConstructedManager() {
     Logger logger = Logger.getLogger("global");
     logger.setLevel(Level.FINEST);
     TestLogHandler logHandler = new TestLogHandler();
     logger.addHandler(logHandler);
     NoOpService service = new NoOpService();
     service.startAsync();
     try {
     new ServiceManager(Arrays.asList(service));
     fail();
     } catch (IllegalArgumentException expected) {}
     service.stopAsync();
     // Nothing was logged!
     assertEquals(0, logHandler.getStoredLogRecords().size());
     }

    public void testPartiallyConstructedManager_transitionAfterAddListenerBeforeStateIsReady() {
     // The implementation of this test is pretty sensitive to the implementation <img src="http://ifeve.com/wp-includes/images/smilies/frownie.png" alt=":(" class="wp-smiley" style="height: 1em; max-height: 1em;"> but we want to
     // ensure that if weird things happen during construction then we get exceptions.
     final NoOpService service1 = new NoOpService();
     // This service will start service1 when addListener is called. This simulates service1 being
     // started asynchronously.
     Service service2 = new Service() {
     final NoOpService delegate = new NoOpService();
     @Override public final void addListener(Listener listener, Executor executor) {
     service1.startAsync();
     delegate.addListener(listener, executor);
     }
     // Delegates from here on down
     @Override public final Service startAsync() {
     return delegate.startAsync();
     }

    @Override public final Service stopAsync() {
     return delegate.stopAsync();
     }

    @Override public final ListenableFuture<State> start() {
     return delegate.start();
     }

    @Override public final ListenableFuture<State> stop() {
     return delegate.stop();
     }

    @Override public State startAndWait() {
     return delegate.startAndWait();
     }

    @Override public State stopAndWait() {
     return delegate.stopAndWait();
     }

    @Override public final void awaitRunning() {
     delegate.awaitRunning();
     }

    @Override public final void awaitRunning(long timeout, TimeUnit unit)
     throws TimeoutException {
     delegate.awaitRunning(timeout, unit);
     }

    @Override public final void awaitTerminated() {
     delegate.awaitTerminated();
     }

    @Override public final void awaitTerminated(long timeout, TimeUnit unit)
     throws TimeoutException {
     delegate.awaitTerminated(timeout, unit);
     }

    @Override public final boolean isRunning() {
     return delegate.isRunning();
     }

    @Override public final State state() {
     return delegate.state();
     }

    @Override public final Throwable failureCause() {
     return delegate.failureCause();
     }
     };
     try {
     new ServiceManager(Arrays.asList(service1, service2));
     fail();
     } catch (IllegalArgumentException expected) {
     assertTrue(expected.getMessage().contains("started transitioning asynchronously"));
     }
     }

    /**
     * This test is for a case where two Service.Listener callbacks for the same service would call
     * transitionService in the wrong order due to a race. Due to the fact that it is a race this
     * test isn't guaranteed to expose the issue, but it is at least likely to become flaky if the
     * race sneaks back in, and in this case flaky means something is definitely wrong.
     *
     * <p>Before the bug was fixed this test would fail at least 30% of the time.
     */

    public void testTransitionRace() throws TimeoutException {
     for (int k = 0; k < 1000; k++) {
     List<Service> services = Lists.newArrayList();
     for (int i = 0; i < 5; i++) {
     services.add(new SnappyShutdownService(i));
     }
     ServiceManager manager = new ServiceManager(services);
     manager.startAsync().awaitHealthy();
     manager.stopAsync().awaitStopped(1, TimeUnit.SECONDS);
     }
     }

    /**
     * This service will shutdown very quickly after stopAsync is called and uses a background thread
     * so that we know that the stopping() listeners will execute on a different thread than the
     * terminated() listeners.
     */
     private static class SnappyShutdownService extends AbstractExecutionThreadService {
     final int index;
     final CountDownLatch latch = new CountDownLatch(1);

    SnappyShutdownService(int index) {
     this.index = index;
     }

    @Override protected void run() throws Exception {
     latch.await();
     }

    @Override protected void triggerShutdown() {
     latch.countDown();
     }

    @Override protected String serviceName() {
     return this.getClass().getSimpleName() + "[" + index + "]";
     }
     }

    public void testNulls() {
     ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
     new NullPointerTester()
     .setDefault(ServiceManager.Listener.class, new RecordingListener())
     .testAllPublicInstanceMethods(manager);
     }

    private static final class RecordingListener extends ServiceManager.Listener {
     volatile boolean healthyCalled;
     volatile boolean stoppedCalled;
     final Set<Service> failedServices = Sets.newConcurrentHashSet();

    @Override public void healthy() {
     healthyCalled = true;
     }

    @Override public void stopped() {
     stoppedCalled = true;
     }

    @Override public void failure(Service service) {
     failedServices.add(service);
     }
     }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351