Akka MessageDispatcher

最近在写一个调度模块,所以调研了几个开源项目的调度模型。(这里指的是应用程序层面的调度,并不是操作系统层面的 scheduling)

先从 Akka 开始吧。

0. MessageDispatcher

An Akka MessageDispatcher is what makes Akka Actors “tick”, it is the engine of the machine so to speak. All MessageDispatcher implementations are also an ExecutionContext, which means that they can be used to execute arbitrary code, for instance Futures.

Akka 的调度模块称为 MessageDispatcher,目前提供了三种:

  • Dispatcher

    基于事件将一组 Actors 绑定到一个线程池。

    • Sharability: Unlimited
    • Mailboxes: Any, creates one per Actor
    • Use cases: Default dispatcher, Bulkheading
    • Driven by: java.util.concurrent.ExecutorService. Specify using “executor” using “fork-join-executor”, “thread-pool-executor” or the FQCN of an akka.dispatcher.ExecutorServiceConfigurator.
  • PinnedDispatcher

    为每个 actor 指定唯一的执行线程。具体来说,每个 actor 有其自己的线程池,并且池中只有一个线程。

    • Sharability: None
    • Mailboxes: Any, creates one per Actor
    • Use cases: Bulkheading
    • Driven by: Any akka.dispatch.ThreadPoolExecutorConfigurator. By default a “thread-pool-executor”.
  • CallingThreadDispatcher

    当前线程执行 actor 调用,也就是说调度器不创建执行线程。

    • Sharability: Unlimited
    • Mailboxes: Any, creates one per Actor per Thread (on demand)
    • Use cases: Testing
    • Driven by: The calling thread (duh)

其中有个几个名词需要解释一下:

  1. Mailbox 就是 Actor 之间传递事件的队列,也就是每个 Actor 的收件箱。
  2. ExecutorService 就是执行线程池,默认为 fork-join-executor,也可以自定义的实现,理解为 Java 标准的线程池即可。

ExecutorService、Actor、Mailbox、Dispatcher 之间的关系如下图所示:


Akka Dispatcher

1. 从入口开始

下面我们从一个简单的 Akka 示例程序开始追踪 MessageDispatcher 的逻辑。

    package com.lightbend.akka.sample;

    import akka.actor.AbstractActor;
    import akka.actor.AbstractActor.Receive;
    import akka.actor.ActorRef;
    import akka.actor.ActorSystem;
    import akka.actor.Props;

    class PrintMyActorRefActor extends AbstractActor {
      @Override
      public Receive createReceive() {
        return receiveBuilder()
            .matchEquals("printit", p -> {
              ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor");
              System.out.println("Second: " + secondRef);
            })
            .build();
      }
    }
    public class ActorHierarchyExperiments {
      public static void main(String[] args) throws java.io.IOException {
        ActorSystem system = ActorSystem.create("testSystem");

        ActorRef firstRef = system.actorOf(Props.create(PrintMyActorRefActor.class), "first-actor");
        System.out.println("First: " + firstRef);
        firstRef.tell("printit", ActorRef.noSender());

        System.out.println(">>> Press ENTER to exit <<<");
        try {
          System.in.read();
        } finally {
          system.terminate();
        }
      }
    }

对于 Akka 程序首先需要初始化一个 ActorSystem,在执行完 create("testSystem") 之后,MessageDispatcher 也完成了初始化,并且开始调度 Actor 执行了。

顺着 create("testSystem") 方法调用,发现最终实例化了 ActorSystemImpl,并且执行了 start() 方法。

// akka-actor/src/main/scala/akka/actor/ActorSystem.scala

  /**
   * Scala API: Creates a new actor system with the specified name and settings
   * The core actor system settings are defined in [[BootstrapSetup]]
   */
  def apply(name: String, setup: ActorSystemSetup): ActorSystem = {
    val bootstrapSettings = setup.get[BootstrapSetup]
    val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
    val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
    val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)

    new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
  }

默认情况下 defaultECNone

ActorSystemImpl 中就可以找到默认全局 dispatcher 的初始化过程。

// akka-actor/src/main/scala/akka/actor/ActorSystem.scala

  val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
    threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))

  val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher

简而言之, Dispatchers 实现了“工厂模式”,根据不同的配置生成相应的 MessageDispatcher

// akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala

  /**
   * The one and only default dispatcher.
   */
  def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)

  /**
   * Returns a dispatcher as specified in configuration. Please note that this
   * method _may_ create and return a NEW dispatcher, _every_ call.
   *
   * Throws ConfigurationException if the specified dispatcher cannot be found in the configuration.
   */
  def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher()

  private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
    dispatcherConfigurators.get(id) match {
      case null ⇒
        // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.
        // That shouldn't happen often and in case it does the actual ExecutorService isn't
        // created until used, i.e. cheap.
        val newConfigurator =
          if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
          else throw new ConfigurationException(s"Dispatcher [$id] not configured")

        dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
          case null     ⇒ newConfigurator
          case existing ⇒ existing
        }

      case existing ⇒ existing
    }
  }

默认全局 dispatcher 的 idakka.actor.default-dispatcher,用户可以定义多个 dispatcher,每个 id 对应一个 MessageDispatcherConfigurator

// akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala

  /**
   * INTERNAL API
   */
  private[akka] def config(id: String): Config = {
    config(id, settings.config.getConfig(id))
  }

  /**
   * INTERNAL API
   */
  private[akka] def config(id: String, appConfig: Config): Config = {
    import scala.collection.JavaConverters._
    def simpleName = id.substring(id.lastIndexOf('.') + 1)
    idConfig(id)
      .withFallback(appConfig)
      .withFallback(ConfigFactory.parseMap(Map("name" → simpleName).asJava))
      .withFallback(defaultDispatcherConfig)
  }

每个 id 对应的配置可以在多个层面进行重写,我们暂且忽略配置相关的代码,以默认配置为例继续跟踪。

2. Dispatcher 初始化

Dispatchers

2.1 构建 MessageDispatcherConfigurator

初始化 MessageDispatcherConfigurator 的逻辑在 configuratorFrom 方法中。

// akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala

  private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
    if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)

    cfg.getString("type") match {
      case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites)
      case "BalancingDispatcher" ⇒
        // FIXME remove this case in 2.4
        throw new IllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " +
          "During a migration period you can still use BalancingDispatcher by specifying the full class name: " +
          classOf[BalancingDispatcherConfigurator].getName)
      case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)
      case fqn ⇒
        val args = List(classOf[Config] → cfg, classOf[DispatcherPrerequisites] → prerequisites)
        prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
          case exception ⇒
            throw new ConfigurationException(
              ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
                "make sure it has constructor with [com.typesafe.config.Config] and " +
                "[akka.dispatch.DispatcherPrerequisites] parameters")
                .format(fqn, cfg.getString("id")), exception)
        }).get
    }
  }

type 的默认值为 Dispatcher

/**
 * Configurator for creating [[akka.dispatch.Dispatcher]].
 * Returns the same dispatcher instance for for each invocation
 * of the `dispatcher()` method.
 */
class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
  extends MessageDispatcherConfigurator(config, prerequisites) {

  private val instance = new Dispatcher(
    this,
    config.getString("id"),
    config.getInt("throughput"),
    config.getNanosDuration("throughput-deadline-time"),
    configureExecutor(),
    config.getMillisDuration("shutdown-timeout"))

  /**
   * Returns the same dispatcher instance for each invocation
   */
  override def dispatcher(): MessageDispatcher = instance
}

配置 executor 的逻辑在父类的 configureExecutor 中。

// akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala

  def configureExecutor(): ExecutorServiceConfigurator = {
    def configurator(executor: String): ExecutorServiceConfigurator = executor match {
      case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
      case "thread-pool-executor"           ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
      case "affinity-pool-executor"         ⇒ new AffinityPoolConfigurator(config.getConfig("affinity-pool-executor"), prerequisites)

      case fqcn ⇒
        val args = List(
          classOf[Config] → config,
          classOf[DispatcherPrerequisites] → prerequisites)
        prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({
          case exception ⇒ throw new IllegalArgumentException(
            ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
                make sure it has an accessible constructor with a [%s,%s] signature""")
              .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
        }).get
    }

    config.getString("executor") match {
      case "default-executor" ⇒ new DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))
      case other              ⇒ configurator(other)
    }
  }

executor 默认为 fork-join-executor,其中的细节暂且忽略,理解为一个线程池即可。

2.2 构建 MessageDispatcher

Dispatcher 的初始化过程主要是创建执行线程池——executorService

// akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala

  private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate {
    lazy val executor: ExecutorService = factory.createExecutorService
    def copy(): LazyExecutorServiceDelegate = new LazyExecutorServiceDelegate(factory)
  }

  @volatile private var executorServiceDelegate: LazyExecutorServiceDelegate =
    new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, threadFactory))

  protected final def executorService: ExecutorServiceDelegate = executorServiceDelegate

也就是通过传入的 ExecutorServiceConfigurator 创建对应的线程池工厂,然后通过 lazy 机制在使用时创建线程池。

3. Dispatcher 使用

3.1 Actor 的 start()

继续追踪 ActorSystem 的 create("testSystem") 方法调用,下一步是 start() 方法。

// akka-actor/src/main/scala/akka/actor/ActorSystem.scala

  private lazy val _start: this.type = try {
    registerOnTermination(stopScheduler())
    // the provider is expected to start default loggers, LocalActorRefProvider does this
    provider.init(this)
    // at this point it should be initialized "enough" for most extensions that we might want to guard against otherwise
    _initialized = true

    if (settings.LogDeadLetters > 0)
      logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
    eventStream.startUnsubscriber()
    loadExtensions()
    if (LogConfigOnStart) logConfiguration()
    this
  } catch {
    case NonFatal(e) ⇒
      try terminate() catch { case NonFatal(_) ⇒ Try(stopScheduler()) }
      throw e
  }

无关的细节先忽略,关于 dispatcher 的重点在 provider.init(this) 中。

// akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala

  private[akka] def init(_system: ActorSystemImpl) {
    system = _system
    rootGuardian.start()
    // chain death watchers so that killing guardian stops the application
    systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
    rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))
    eventStream.startDefaultLoggers(_system)
  }

这里涉及三个系统 Actor —— rootGuardiansystemGuardianguardian

首先启动 rootGuardian,实际上所有 Actor 的 start() 流程都是一致的。(Router略有不同,我们暂且忽略)

// akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala

  /**
   * Start this cell, i.e. attach it to the dispatcher.
   */
  def start(): this.type = {
    // This call is expected to start off the actor by scheduling its mailbox.
    dispatcher.attach(this)
    this
  }
// akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala

  /**
   * Attaches the specified actor instance to this dispatcher, which includes
   * scheduling it to run for the first time (Create() is expected to have
   * been enqueued by the ActorCell upon mailbox creation).
   */
  final def attach(actor: ActorCell): Unit = {
    register(actor)
    registerForExecution(actor.mailbox, false, true)
  }

注意:到了最核心的代码了 —— registerForExecution

// akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala

  /**
   * Returns if it was registered
   *
   * INTERNAL API
   */
  protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
    if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
      if (mbox.setAsScheduled()) {
        try {
          executorService execute mbox
          true
        } catch {
          case e: RejectedExecutionException ⇒
            try {
              executorService execute mbox
              true
            } catch { //Retry once
              case e: RejectedExecutionException ⇒
                mbox.setAsIdle()
                eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
                throw e
            }
        }
      } else false
    } else false
  }
  1. 判断该 actor 是否可以调度执行
  2. 设置该 actor 为 Scheduled 状态
  3. 将该 actor 的 mbox 提交给线程池
  4. 如果两次提交失败,那么将状态设置为 Idle

3.2 循环 registerForExecution

rootGuardian.start() 过程中的这次注册执行肯定是可以成功提交的,我们接着看 Mailboxrun() 方法。(对于提交失败的情况稍后再分析)

// akka-actor/src/main/scala/akka/dispatch/Mailbox.scala

  override final def run(): Unit = {
    try {
      if (!isClosed) { //Volatile read, needed here
        processAllSystemMessages() //First, deal with any system messages
        processMailbox() //Then deal with messages
      }
    } finally {
      setAsIdle() //Volatile write, needed here
      dispatcher.registerForExecution(this, false, false)
    }
  }

如果没有关闭,那么处理系统消息和收件箱。最后在 finally 中再一次调用了 dispatcher 的 registerForExecution

如果始终可以成功注册执行的话,那么本质上就是一个处理队列消息的死循环了,可以说是一个挺精巧的设计

但是 Akka 的调度模型并没有优先级的概念,只是依赖线程池的任务队列实现了FIFO;在时间片上做了一些控制,但是由于 JVM 没有提供协程,实际上也比较难精确控制。

// akka-actor/src/main/scala/akka/dispatch/Mailbox.scala

  /**
   * Process the messages in the mailbox
   */
  @tailrec private final def processMailbox(
    left:       Int  = java.lang.Math.max(dispatcher.throughput, 1),
    deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
    if (shouldProcessMessage) {
      val next = dequeue()
      if (next ne null) {
        if (Mailbox.debug) println(actor.self + " processing message " + next)
        actor invoke next
        if (Thread.interrupted())
          throw new InterruptedException("Interrupted while processing actor messages")
        processAllSystemMessages()
        if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
          processMailbox(left - 1, deadlineNs)
      }
    }

processMailbox 方法中实现了时间片控制:throughput 控制一次处理的消息条数,throughputDeadlineTime 控制一次处理的时间片,达到限制条件就让出线程,重新提交自己。

3.3 registerForExecution 失败

回过头来再看一下注册执行失败的情况:a. 不满足 canBeScheduledForExecution 条件;b. 被 Rejected 了两次。

对于不满足 canBeScheduledForExecution 条件的情况

  final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = currentStatus match {
    case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
    case Closed           ⇒ false
    case _                ⇒ hasSystemMessageHint || hasSystemMessages
  }

  /**
   * Set Scheduled status, keeping primary status as is.
   */
  @tailrec
  final def setAsScheduled(): Boolean = {
    val s = currentStatus
    /*
     * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
     * Scheduled bit already set.
     */
    if ((s & shouldScheduleMask) != Open) false
    else updateStatus(s, s | Scheduled) || setAsScheduled()
  }

不考虑异常状态,实际上只有队列中已经没有待处理消息了才不提交任务,此时不提交也是合适的,只是后续如果再来新消息应该主动注册执行。(这部分逻辑等到追踪发送消息代码时再分析)

对于被 Rejected 了两次的情况就比较麻烦了,如果队列中残存待处理的消息,而且又很久不再来新的消息,那么这部分残存的消息岂不是不能被处理了?

在这种情况下,如果永远没有新消息,那么确实就永远无法处理残存的消息了,这是一种很极端的情况,通常消息都是源源不断的,但是在日志中看到 "registerForExecution was rejected twice!",还是需要慎重对待的。

3.4 sendSystemMessage

回到 provider.init(this) 方法,下一步是发送系统消息systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))

// akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala

  /**
   * INTERNAL API
   */
  protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = {
    val mbox = receiver.mailbox
    mbox.systemEnqueue(receiver.self, invocation)
    registerForExecution(mbox, false, true)
  }

将消息发送到对应 Actor 的收件箱之后,又调用了 registerForExecution(mbox, false, true) 方法,如此确保新来的消息有机会被处理。(也就是3.3节分析的问题)

4. actorOf 与 tell

通过以上的分析已经可以知晓 dispatcher 的原理与实现了,但是其中的调用只涉及系统 Actor,实际上普通 Actor 的调用也是一样的,下面简单看一下即可。

创建普通 Actor 的 actorOf 方法

// akka-actor/src/main/scala/akka/actor/ActorSystem.scala

  def actorOf(props: Props, name: String): ActorRef =
    if (guardianProps.isEmpty) guardian.underlying.attachChild(props, name, systemService = false)
    else throw new UnsupportedOperationException(
      s"cannot create top-level actor [$name] from the outside on ActorSystem with custom user guardian")
// akka-actor/src/main/scala/akka/actor/dungeon/Children.scala

  private[akka] def attachChild(props: Props, name: String, systemService: Boolean): ActorRef =
    makeChild(this, props, checkName(name), async = true, systemService = systemService)

  private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = {
    if (cell.system.settings.SerializeAllCreators && !systemService && props.deploy.scope != LocalScope)
      try {
        val ser = SerializationExtension(cell.system)
        props.args forall (arg ⇒
          arg == null ||
            arg.isInstanceOf[NoSerializationVerificationNeeded] ||
            {
              val o = arg.asInstanceOf[AnyRef]
              val serializer = ser.findSerializerFor(o)
              val bytes = serializer.toBinary(o)
              serializer match {
                case ser2: SerializerWithStringManifest ⇒
                  val manifest = ser2.manifest(o)
                  ser.deserialize(bytes, serializer.identifier, manifest).get != null
                case _ ⇒
                  ser.deserialize(bytes, arg.getClass).get != null
              }
            })
      } catch {
        case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e)
      }
    /*
     * in case we are currently terminating, fail external attachChild requests
     * (internal calls cannot happen anyway because we are suspended)
     */
    if (cell.childrenRefs.isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated")
    else {
      reserveChild(name)
      // this name will either be unreserved or overwritten with a real child below
      val actor =
        try {
          val childPath = new ChildActorPath(cell.self.path, name, ActorCell.newUid())
          cell.provider.actorOf(cell.systemImpl, props, cell.self, childPath,
            systemService = systemService, deploy = None, lookupDeploy = true, async = async)
        } catch {
          case e: InterruptedException ⇒
            unreserveChild(name)
            Thread.interrupted() // clear interrupted flag before throwing according to java convention
            throw e
          case NonFatal(e) ⇒
            unreserveChild(name)
            throw e
        }
      // mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise
      if (mailbox ne null) for (_ ← 1 to mailbox.suspendCount) actor.suspend()
      initChild(actor)
      actor.start()
      actor
    }
  }

看到 actor.start() 就不需要继续向下看了。

发送消息的 tell 方法

// akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala

  def sendMessage(msg: Envelope): Unit =
    try {
      val msgToDispatch =
        if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg)
        else msg

      dispatcher.dispatch(this, msgToDispatch)
    } catch handleException
// akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala

  /**
   * INTERNAL API
   */
  protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
    val mbox = receiver.mailbox
    mbox.enqueue(receiver.self, invocation)
    registerForExecution(mbox, true, false)
  }

又看到了熟悉的 registerForExecution 方法。

5. PinnedDispatcher

最后我们再看一下 PinnedDispatcher 如何实现独占线程的。

// akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala

class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
  extends MessageDispatcherConfigurator(config, prerequisites) {

  private val threadPoolConfig: ThreadPoolConfig = configureExecutor() match {
    case e: ThreadPoolExecutorConfigurator ⇒ e.threadPoolConfig
    case other ⇒
      prerequisites.eventStream.publish(
        Warning(
          "PinnedDispatcherConfigurator",
          this.getClass,
          "PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format(
            config.getString("id"))))
      ThreadPoolConfig()
  }
  /**
   * Creates new dispatcher for each invocation.
   */
  override def dispatcher(): MessageDispatcher =
    new PinnedDispatcher(
      this, null, config.getString("id"),
      config.getMillisDuration("shutdown-timeout"), threadPoolConfig)

}

DispatcherConfigurator 不同的是 PinnedDispatcherConfiguratordispatcher() 方法每次都返回一个新的 dispatcher,也就是说每个 actor 都有一个私有的 dispatcher。

// akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala
class PinnedDispatcher(
  _configurator:     MessageDispatcherConfigurator,
  _actor:            ActorCell,
  _id:               String,
  _shutdownTimeout:  FiniteDuration,
  _threadPoolConfig: ThreadPoolConfig)
  extends Dispatcher(
    _configurator,
    _id,
    Int.MaxValue,
    Duration.Zero,
    _threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1),
    _shutdownTimeout) {

  @volatile
  private var owner: ActorCell = _actor

  //Relies on an external lock provided by MessageDispatcher.attach
  protected[akka] override def register(actorCell: ActorCell) = {
    val actor = owner
    if ((actor ne null) && actorCell != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
    owner = actorCell
    super.register(actorCell)
  }
  //Relies on an external lock provided by MessageDispatcher.detach
  protected[akka] override def unregister(actor: ActorCell) = {
    super.unregister(actor)
    owner = null
  }

通过 _threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1) 设置每个 dispatcher 中的线程池为一个独占的线程,所属 actor 的任务都是提交给这个私有的线程池。



如果觉得我的文章对您有用,请随意打赏。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容

  • 持久化 当我们在集群系统中,一台机器向另一台机器发送一段数据,负责接收的机器在接收数据前突然宕机,就会造成数据丢失...
    mango_knight阅读 4,529评论 0 4
  • 首先说一下Actor Model,作为一种进程或者线程间的通信模型,一般来说有两种选择,一种是CSP,比如Go语言...
    墨弈阅读 3,370评论 0 51
  • 时有人不是问我读书习惯怎么养成就是找我推荐书,一开始我还会很认真的做解答,并去询问其喜欢什么类型,毕竟读书也是希望...
    夏筠若阅读 8,562评论 75 392
  • 现在是2017.11.12凌晨4点10分 刚吃饱 最近除了吃还是吃,其实很想睡,还是得吃,因为饿。 这样的日子估计...
    行走的依米阅读 206评论 0 0
  • 我可以原谅所有人,可就是原谅不了你,放不过你。 你就不会痛不痒不牵强了,那我呢。我记得小时候,你说你会陪着我到...
    Yin爱丽丝阅读 295评论 0 3