业务流程是服务的主要用例之一,你是否尝试以尽可能多的并发方式编排多个服务调用,因此可以获得尽可能好的响应时间,或者你尝试进行多个业务操作,数据写入,数据读取, 服务调用等等,依赖于彼此等等。简洁地描述您的业务逻辑的能力对于使服务易于理解和维护至关重要。业务流程DSL——squbs-pattern的一部分——将使异步代码易于读写和推理。
依赖
业务流DSL是squbs-pattern的一部分。请添加如下依赖:
"org.squbs" %% "squbs-pattern" % squbsVersion,
"com.typesafe.akka" %% "akka-contrib" % akkaVersion
开始
让我们开始一个简单但完整的业务流程示例。这个业务流程由3个相互关联的异步任务构成:
- 加载请求此业务流程的用户。
- 加载项目。详细信息依赖于查看的用户。
- 生成项目视图,基于用户和项目数据。
让我们深入到流程和细节:
// 1. Define the orchestrator actor.
class MyOrchestrator extends Actor with Orchestrator {
// 2. Provide the initial expectOnce block that will receive the request message.
expectOnce {
case r: MyOrchestrationRequest => orchestrate(sender(), r)
}
// 3. Define orchestrate - the orchestration function.
def orchestrate(requester: ActorRef, request: MyOrchestrationRequest) {
// 4. Compose the orchestration flow using pipes (>>) as needed by the business logic.
val userF = loadViewingUser
val itemF = userF >> loadItem(request.itemId)
val itemViewF = (userF, itemF) >> buildItemView
// 5. Conclude and send back the result of the orchestration.
for {
user <- userF
item <- itemF
itemView <- itemViewF
} {
requester ! MyOrchestrationResult(user, item, itemView)
context.stop(self)
}
// 6. Make sure to stop the orchestrator actor by calling
// context.stop(self).
}
// 7. Implement the orchestration functions as in the following patterns.
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
val itemPromise = OPromise[Option[Item]]
context.actorOf(Props[ItemActor]) ! ItemRequest(itemId, seller.id)
expectOnce {
case item: Item => itemPromise success Some(item)
case e: NoSuchItem => itemPromise success None
}
itemPromise.future
}
def loadViewingUser: OFuture[Option[User]] = {
val userPromise = OPromise[Option[User]]
...
userPromise.future
}
def buildItemView(user: Option[User], item: Option[Item]): OFuture[Option[ItemView]] = {
...
}
}
依赖
添加如下依赖:
"org.squbs" %% "squbs-pattern" % squbsVersion
核心概念
业务流程
Orchestrator是由actor继承用于支持业务流程功能的特质。从技术上讲, 它是聚合器的子特性, 并提供其所有功能。从技术上讲, 它是Aggregator的子特质, 并提供其所有功能。此外, 它还提供了功能和语法, 允许有效的业务流程组合, 以及在下面详细讨论的创建业务流程功能时经常需要的实用工具。要使用业务流程, actor可以简单地继承Orchestrator特质。
import org.squbs.pattern.orchestration.Orchestrator
class MyOrchestrator extends Actor with Orchestrator {
...
}
与Aggregator类似, 业务流程通常不声明Akka actor接收块, 但允许expect/expectOnce/unexpect 块来定义在任何点上预期的响应。这些预期代码块通常从内部业务流程函数中使用。
业务流程的Future和Promise
业务流程的promise和future与scala.concurrent.Future 和 scala.concurrent.Promise 非常相似,这里描述的指示一个名字的变化到OFuture和OPromise,
标志着它们应当在actor里用于业务流程。业务流程版本通过它们与没有并发行为的工件版本区分开来。它们在签名中不使用 (隐式) ExecutionContext, 也不用于执行。它们还缺少一些显式异步执行闭包的函数。在演员内部使用, 他们的回调绝不会超出Actor的范围之外。这将消除由于回调而从不同线程同时修改Actor状态的危险。此外, 它们还包括性能优化, 假设它们总是在Actor内部使用。
注意:不要传递一个OFuture 到actor外面。提供了隐式转换以在 scala.concurrent.Future 和 OFuture 之间进行转换。
import org.squbs.pattern.orchestration.{OFuture, OPromise}
异步的业务流程函数
业务流程函数是一个被称为orchestration 流的函数,用来执行单一业务流程任务,例如调用一个服务或者函数。业务流程功能必须符合以下准则:
1,它必须以非Future参数作为输入。根据 Scala 的当前限制, 参数的数目最多可达22。在所有情况下, 这些函数不应该有那么多参数。
2,Scala函数可以柯里化,从 piped (future)输入中直接分离输入。管道输入必须是在柯里化函数中最后一组参数。
3,它必须导致异步执行。异步执行通常通过发送消息给不同的actor来实现。
4,Scala实现必须返回一个OFuture (orchestration future)。
以下是业务流程函数的例子:
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
val itemPromise = OPromise[Option[Item]]
context.actorOf(Props[ItemActor]) ! ItemRequest(itemId, seller.id)
expectOnce {
case item: Item => itemPromise success Some(item)
case e: NoSuchItem => itemPromise success None
}
itemPromise.future
}
这个例子,函数是柯里化的。itemId 参数是同步传递的, 而发送方是异步传递的。
下面的示例在逻辑上与第一个示例相同, 只是使用 ask 而不是tell:
private def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
import context.dispatcher
implicit val timeout = Timeout(3 seconds)
(context.actorOf[ItemActor] ? ItemRequest(itemId, seller.id)).mapTo[Option[Item]]
}
此例中,ask ? 操作返回了一个scala.concurrent.Future。Orchestrator 特质提供了scala.concurrent.Future 和OFuture的隐式转换,所以ask ?的结果转换为这个函数申明的返回类型 OFuture 且无需显示调用转换。
ask 或者?似乎需要编写更少的代码, 但它既不性能, 也不像expect/expectOnce那样灵活 。预期块中的逻辑也可用于结果的进一步转换。同样可以通过ask返回的future使用回调来实现。但是, 由于以下原因, 无法轻松地补偿性能:
1,Ask将创建一个新的actor作为响应接收者。
2,从 scala.concurrent.Future 到 OFuture 的转换以及 Java API 的填充操作需要将消息发送回orchestrator, 从而添加一个消息跳转, 同时增加了延迟和 CPU。
测试显示,当使用ask,与expect/expectOnce相对时,有更高的延迟和CPU利用。
组合
管道,或者>>符号使用一个或多个业务流程future OFuture ,并使其结果作为业务流程功能的输入。当所有表示对函数输入的 OFutures 都被解析时, 实际的业务流程函数调用将异步发生。
管道是业务流程 DSL 的主要组件, 它允许根据其输入和输出组成业务流程功能。业务流程流是由业务流程声明隐式地确定的,或者通过管道来声明业务流程流。
当多个 OFutures 被输送到一个业务流程函数时, OFutures 需要以逗号分隔并括在圆括号中, 构造一个 OFutures 的元组作为输入。元组中的元素,它们的 OFuture 类型必须与函数参数和类型匹配, 或者是在柯里化的情况下的最后一组参数, 否则编译将失败。此类错误通常也由 IDE 捕获。
下面的例子展示了一个简单的业务流程声明以及使用前面章节申明的loadItem 业务流程函数:
val userF = loadViewingUser
val itemF = userF >> loadItem(request.itemId)
val itemViewF = (userF, itemF) >> buildItemView
上面的代码可以如下描述:
- 首先调用loadViewingUser (不带输入参数)
- 当查看用户可用的时候,使用查看用户作为调用loadItem 的输入(在这种情况下前面有个itemId可用)。本例中,loadItem遵循上面业务流程函数声明中确切的签名。
- 当用户和项目可用时,调用buildItemView。
业务流程实例生命周期
业务流程通常是一次性actor。它们接收初始请求, 然后根据调用的业务流程函数发出的请求进行多个响应。
为了允许一个业务流程服务多个业务流程请求,业务流程必须结合每个请求的输入和响应, 并将它们与不同的请求隔离。这将大大使其发展复杂化,并且可能不会按照这些示例中看到的清晰的业务流程表示中得到最终结果。创建一个新的actor是足够廉价的,我们能够容易地为每个业务流程请求创建一个新的业务流程actor。
业务流程回调的最后部分是停止actor。在Scala中,通过调用context.stop(self)或者context stop self (如果中缀表示法是首选)。
完成业务流程流
在这里, 我们把上述所有的概念放在一起。用更完整的解释来重复上面的例子:
// 1. Define the orchestrator actor.
class MyOrchestrator extends Actor with Orchestrator {
// 2. Provide the initial expectOnce block that will receive the request message.
// After this request message is received, the same request will not be
// expected again for the same actor.
// The expectOnce likely has one case match which is the initial request and
// uses the request arguments or members, and the sender() to call the high
// level orchestration function. This function is usually named orchestrate.
expectOnce {
case r: MyOrchestrationRequest => orchestrate(sender(), r)
}
// 3. Define orchestrate. Its arguments become immutable by default
// allowing developers to rely on the fact these will never change.
def orchestrate(requester: ActorRef, request: MyOrchestrationRequest) {
// 4. If there is anything we need to do synchronously to setup for
// the orchestration, do this in the first part of orchestrate.
// 5. Compose the orchestration flow using pipes as needed by the business logic.
val userF = loadViewingUser
val itemF = userF >> loadItem(request.itemId)
val itemViewF = (userF, itemF) >> buildItemView
// 6. End the flow by calling functions composing the response(s) back to the
// requester. If the composition is very large, it may be more readable to
// use for-comprehensions rather than a composition function with very large
// number of arguments. There may be multiple such compositions in case partial
// responses are desired. This example shows the use of a for-comprehension
// just for reference. You can also use an orchestration function with
// 3 arguments plus the requester in such small cases.
for {
user <- userF
item <- itemF
itemView <- itemViewF
} {
requester ! MyOrchestrationResult(user, item, itemView)
context.stop(self)
}
// 7. Make sure the last response stops the orchestrator actor by calling
// context.stop(self).
}
// 8. Implement the asynchronous orchestration functions inside the
// orchestrator actor, but outside the orchestrate function.
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
val itemPromise = OPromise[Option[Item]]
context.actorOf[ItemActor] ! ItemRequest(itemId, seller.id)
expectOnce {
case item: Item => itemPromise success Some(item)
case e: NoSuchItem => itemPromise success None
}
itemPromise.future
}
def loadViewingUser: OFuture[Option[User]] = {
val userPromise = OPromise[Option[User]]
...
userPromise.future
}
def buildItemView(user: Option[User], item: Option[Item]): OFuture[Option[ItemView]] = {
...
}
}
业务流程函数重用
业务流程功能通常依赖于Orchestrator特质提供的工具, 无法独立运行。但是, 在许多情况下, 需要在多个协调中重用业务流程功能, 以不同的方式进行编排。在这种情况下, 重要的是将业务流程功能分成不同的特质, 这些特质将被混合到每个业务流程中。该特性必须具有对业务流程功能的访问权限, 并且需要对Orchestrator的自我引用。下面显示了这样一个特质的示例:
trait OrchestrationFunctions { this: Actor with Orchestrator =>
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
...
}
}
上面例子Actor with Orchestrator 是一个类型自应用。它告诉Scala编译器,这个特质只能够混入到一个同样也是Orchestrator 的Actor,因此可以访问Actor和Orchestrator提供的工具。
要使用业务流程中的 OrchestrationFunctions特质, 你只需将这个特质混合到一个业务流程中, 如下所示:
class MyOrchestrator extends Actor with Orchestrator with OrchestrationFunctions {
...
}
确保响应的唯一性
当使用expect 或 expectOnce时,我们受到单个期望块的模式匹配功能的限制, 它的范围有限, 无法区分多个业务流程函数中的多个预期块之间的匹配。在同一业务流程函数中声明预期之前, 收到的请求消息中没有逻辑链接的。对于复杂的业务流程, 我们可能会遇到消息混淆的问题。响应没有与正确的请求关联, 也没有正确处理。解决这个问题有几个策略:
如果初始消息的接收者,因此响应消息的发送者是唯一的,那么匹配可能包括对消息的发送方的引用,就像在下面的示例模式匹配中一样。
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
val itemPromise = OPromise[Option[Item]]
val itemActor = context.actorOf(Props[ItemActor])
itemActor ! ItemRequest(itemId, seller.id)
expectOnce {
case item: Item if sender() == itemActor => itemPromise success Some(item)
case e: NoSuchItem if sender() == itemActor => itemPromise success None
}
itemPromise.future
}
或者, Orchestrator 特质提供了一个消息 id 生成器, 它在与参与者实例结合时是唯一的。我们可以使用这个 id 发生器生成一个唯一的消息 id。接受此类消息的Actor只需要将此消息 id 作为响应消息的一部分返回。下面的示例显示了使用消息 id 生成器的业务流程函数。
def loadItem(itemId: String)(seller: User): OFuture[Option[Item]] = {
val itemPromise = OPromise[Option[Item]]
// Generate the message id.
val msgId = nextMessageId
context.actorOf(Props[ItemActor]) ! ItemRequest(msgId, itemId, seller.id)
// Use the message id as part of the response pattern match. It needs to
// be back-quoted as to not be interpreted as variable extractions, where
// a new variable is created by extraction from the matched object.
expectOnce {
case item @ Item(`msgId`, _, _) => itemPromise success Some(item)
case NoSuchItem(`msgId`, _) => itemPromise success None
}
itemPromise.future
}