原文地址:Streaming Pipeline
概览
我们通常需要跨服务端/客户端的公共基础设施或组织标准。这些基础设施包括而不限于,日志,指标手机,请求跟踪,授权/认证,cookie管理,A/B测试,等等。
由于squbs促进隔离关注,这个逻辑属于基础设施而不是服务端/客户端实现。squbs streaming pipeline可以基础设施提供组件安装至服务端/客户端,而不需要服务端/客户端的所有者来担心这些方面。
一般来说,squbs streaming pipeline是一个双向流,作为之间的一个桥梁:
-
Akka HTTP层和squbs服务:
- 所有的请求消息从Akka Http到squbs服务将会经过pipeline
- 反之亦然,所有的来自squbs的响应消息将经过pipeline
-
squbs客户端和 Akka HTTP 主机连接池的flow
- 所有请求从squbs client 到 Akka HTTP 主机连接池将经过pipeline
*反之亦然,所有来自Akka HTTP主机连接池至squbs客户端的响应将经过pipeline。
- 所有请求从squbs client 到 Akka HTTP 主机连接池将经过pipeline
流pipeline描述
默认的pre/post flow通过下面的配置定义,将自动建立客户端/服务端两端之间的pipeline,除非服务端/客户端各自的defaultPipeline
配置设置为off
。
squbs.pipeline.streaming.defaults {
pre-flow = defaultPreFlow
post-flow = defaultPostFlow
}
service声明pipeline
在 squbs-meta.conf
, 你可以为一个服务指定一个pipeline:
squbs-services = [
{
class-name = org.squbs.sample.MyActor
web-context = mypath
pipeline = dummyflow
}
]
如果这里没有针对 squbs-service个性化的pipeline,主需要忽略。
通过以上配置,pipeline看起来如下:
+---------+ +---------+ +---------+ +---------+
RequestContext ~>| |~> | |~> | |~> | |
| default | | dummy | | default | | squbs |
| PreFlow | | flow | | PostFlow| | service |
RequestContext <~| |<~ | |<~ | |<~ | |
+---------+ +---------+ +---------+ +---------+
RequestContext
基本上是一个 HttpRequest
和HttpResponse
的包装,还允许携带上下文信息。
客户端pipeline的声明
在application.conf
中,你可以为一个客户端指定一个pipeline:
sample {
type = squbs.httpclient
pipeline = dummyFlow
}
如果没有为squbs-client提供自定义的pipeline ,只需要忽略。
通过以上配置,pipeline看起来如下:
+---------+ +---------+ +---------+ +----------+
RequestContext ~>| |~> | |~> | |~> | Host |
| default | | dummy | | default | |Connection|
| PreFlow | | flow | | PostFlow| | Pool |
RequestContext <~| |<~ | |<~ | |<~ | Flow |
+---------+ +---------+ +---------+ +----------+
Bidi Flow配置
一个双向流可以配置如下:
dummyflow {
type = squbs.pipelineflow
factory = org.squbs.sample.DummyBidiFlow
}
- type: 确定配置的身份为
squbs.pipelineflow
。 - factory:创建
BidiFlow
的工厂类
DummyBidiFlow
的一个例子:
class DummyBidiFlow extends PipelineFlowFactory {
override def create(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val inbound = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("DummyRequest", "ReqValue")) })
val outbound = b.add(Flow[RequestContext].map{ rc => rc.addResponseHeader(RawHeader("DummyResponse", "ResValue"))})
BidiShape.fromFlows(inbound, outbound)
})
}
}
中止flow
在一些具体场景下,一个pipeline中的stage可能需要中止flow并返回一个HttpResponse
,例如,在授权/认证情况下。在这些场景下,应当跳过管道的其余部分,请求不应该抵达squbs服务。来跳过flow的剩余部分:
- flow需要在构造时加入
abortable
,例如b.add(authorization abortable)
。 - 当你需要中止时,通过
HttpResponse
在RequestContext
上调用abortWith
。
在下面DummyAbortableBidiFlow
例子,authorization
与abortable
之间是一个bidi flow 。当用户认证未通过时,它中止flow:
class DummyAbortableBidiFlow extends PipelineFlowFactory {
override def create(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val inboundA = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInA", "valInA")) })
val inboundC = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInC", "valInC")) })
val outboundA = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutA", "valOutA"))})
val outboundC = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutC", "valOutC"))})
val inboundOutboundB = b.add(authorization abortable)
inboundA ~> inboundOutboundB.in1
inboundOutboundB.out1 ~> inboundC
inboundOutboundB.in2 <~ outboundC
outboundA <~ inboundOutboundB.out2
BidiShape(inboundA.in, inboundC.out, outboundC.in, outboundA.out)
})
}
val authorization = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val authorization = b.add(Flow[RequestContext] map { rc =>
if(!isAuthorized) rc.abortWith(HttpResponse(StatusCodes.Unauthorized, entity = "Not Authorized!"))
else rc
})
val noneFlow = b.add(Flow[RequestContext]) // Do nothing
BidiShape.fromFlows(authorization, noneFlow)
})
}
一旦flow加入 abortable
, bidi flow被连接。bidi flow检查HttpResponse
的存在,绕过和发送下游请求。这就是上面的DummyAbortableBidiFlow
看起来:
+-----------------------------------+
| +-----------+ +-----------+ | +-----------+
+-----------+ +---------+ | | | ~> | filter o~~~0 ~>| |
| | | | | | | |not aborted| | | inboundC | ~> RequestContext
RequestContext ~> | inboundA |~> | |~> 0~~o broadcast | +-----------+ | | |
| | | | | | | | +-----------+
+-----------+ | | | | | ~> +-----------+ |
| inbound | | +-----------+ | filter | |
| outbound| | | aborted | |
+-----------+ | B | | +-----------+ <~ +-----------+ | +-----------+
| | | | | | | | | |
RequestContext <~ | outboundA | <~| | <~0~~o merge | | | outboundC | <~ RequestContext
| | | | | | o~~~~~~~~~~~~~~~~~~~~0 <~| |
+-----------+ +---------+ | +-----------+ | +-----------+
+-----------------------------------+