概述
我们经常跨不同服务端/客户端的公共基础结构功能。这样的基础结构包括,但不限于,日志、指标收集、请求追踪、认证/授权、 追踪、cookie 管理、A/B测试等。
随着 squbs 促进关注的分离, 此类逻辑属于基础结构, 而不是客户端实现。squbs 管道允许基础结构提供组件安装到客户端, 而无需客户自己担心这些方面。
通常讲,一个squbs管道是一个Bidi Flow ,扮演如下之间的桥梁:
- Akka HTTP 层和squbs服务
- 从Akka HTTP 到 squbs 服务发送的所有请求消息都将通过管道
- 反之亦然, 从 squbs 服务发送的所有响应消息将通过管道
- squbs客户端和Akka HTTP主机连接池流:
- 从squbs客户端到Akka HTTP连接池发送的所有请求消息将通过管道
- 反之亦然,从Akka HTTP连接池到squbs客户端所有响应消息将通过管道
管道申明
通过以下配置指定的默认前/后流将自动连接到服务器/客户端管道, 除非在单个服务/客户端配置中defaultPipeline 设置为 off:
squbs.pipeline.server.default {
pre-flow = defaultServerPreFlow
post-flow = defaultServerPostFlow
}
squbs.pipeline.client.default {
pre-flow = defaultClientPreFlow
post-flow = defaultClientPostFlow
}
服务端管道申明
在squbs-meta.conf, 你可以为服务端指定一个管道:
squbs-services = [
{
class-name = org.squbs.sample.MyActor
web-context = mypath
pipeline = dummyflow
}
]
如果没有用于 squbs 服务端的自定义管道, 则只需省略。
使用上述配置, 管线将如下所示:
RequestContext ~>| |~> | |~> | |~> | |
| default | | dummy | | default | | squbs |
| PreFlow | | flow | | PostFlow| | service |
RequestContext <~| |<~ | |<~ | |<~ | |
+---------+ +---------+ +---------+ +---------+
RequestContext 基本上是围绕 HttpRequest 和 HttpResponse的包装, 这也允许携带上下文信息。
客户端管道申明
在application.conf, 你可以为客户端指定一个管道:
sample {
type = squbs.httpclient
pipeline = dummyFlow
}
如果没有用于 squbs 客户端的自定义管道, 则只需省略。
使用上述配置, 管线将如下所示:
+---------+ +---------+ +---------+ +----------+
RequestContext ~>| |~> | |~> | |~> | Host |
| default | | dummy | | default | |Connection|
| PreFlow | | flow | | PostFlow| | Pool |
RequestContext <~| |<~ | |<~ | |<~ | Flow |
+---------+ +---------+ +---------+ +----------+
Bidi Flow配置
一个 bidi flow可以如下方式指定:、
dummyflow {
type = squbs.pipelineflow
factory = org.squbs.sample.DummyBidiFlow
}
type: 将配置标识为一个 squbs.pipelineflow。
factory: 这个工厂类创建BidiFlow from。
DummyBidiFlow示例如下所示:
class DummyBidiFlow extends PipelineFlowFactory {
override def create(context: Context)(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val inbound = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("DummyRequest", "ReqValue")) })
val outbound = b.add(Flow[RequestContext].map{ rc => rc.addResponseHeader(RawHeader("DummyResponse", "ResValue"))})
BidiShape.fromFlows(inbound, outbound)
})
}
}
中止流
在某些情况下, 管道中的一个阶段可能需要中止流并返回一个HttpResponse, 例如, 在进行身份验证/授权时。在这种情况下, 应跳过管线的其余部分, 并且请求不应到达 squbs 服务。要跳过其余的流:
- 流中需要增加带有abortable的构造器,例如b.add(authorization abortable)。
- 在RequestContext上调用带有一个HttpResponse的abortWith,当你需要中止的时候。
下面DummyAbortableBidiFlow例子,authorization是一个带有abortable 的bidi flow ,并且当用户没有授权的时候中止流:
class DummyAbortableBidiFlow extends PipelineFlowFactory {
override def create(context: Context)(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val inboundA = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInA", "valInA")) })
val inboundC = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInC", "valInC")) })
val outboundA = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutA", "valOutA"))})
val outboundC = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutC", "valOutC"))})
val inboundOutboundB = b.add(authorization abortable)
inboundA ~> inboundOutboundB.in1
inboundOutboundB.out1 ~> inboundC
inboundOutboundB.in2 <~ outboundC
outboundA <~ inboundOutboundB.out2
BidiShape(inboundA.in, inboundC.out, outboundC.in, outboundA.out)
})
}
val authorization = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val authorization = b.add(Flow[RequestContext] map { rc =>
if(!isAuthorized) rc.abortWith(HttpResponse(StatusCodes.Unauthorized, entity = "Not Authorized!"))
else rc
})
val noneFlow = b.add(Flow[RequestContext]) // Do nothing
BidiShape.fromFlows(authorization, noneFlow)
})
}
一旦流添加了abortable, bidi flow 就会被连接。此bidi flow 检查是否存在HttpResponse并绕过或发送下游请求。上述DummyAbortableBidiFlow看起来是这样:
| +-----------+ +-----------+ | +-----------+
+-----------+ +---------+ | | | ~> | filter o~~~0 ~>| |
| | | | | | | |not aborted| | | inboundC | ~> RequestContext
RequestContext ~> | inboundA |~> | |~> 0~~o broadcast | +-----------+ | | |
| | | | | | | | +-----------+
+-----------+ | | | | | ~> +-----------+ |
| inbound | | +-----------+ | filter | |
| outbound| | | aborted | |
+-----------+ | B | | +-----------+ <~ +-----------+ | +-----------+
| | | | | | | | | |
RequestContext <~ | outboundA | <~| | <~0~~o merge | | | outboundC | <~ RequestContext
| | | | | | o~~~~~~~~~~~~~~~~~~~~0 <~| |
+-----------+ +---------+ | +-----------+ | +-----------+
+-----------------------------------+