squbs-22.流请求/响应Pipeline

原文地址:Streaming Pipeline

概览

我们通常需要跨服务端/客户端的公共基础设施或组织标准。这些基础设施包括而不限于,日志,指标手机,请求跟踪,授权/认证,cookie管理,A/B测试,等等。

由于squbs促进隔离关注,这个逻辑属于基础设施而不是服务端/客户端实现。squbs streaming pipeline可以基础设施提供组件安装至服务端/客户端,而不需要服务端/客户端的所有者来担心这些方面。

一般来说,squbs streaming pipeline是一个双向流,作为之间的一个桥梁:

  • Akka HTTP层和squbs服务:

    • 所有的请求消息从Akka Http到squbs服务将会经过pipeline
    • 反之亦然,所有的来自squbs的响应消息将经过pipeline
  • squbs客户端和 Akka HTTP 主机连接池的flow

    • 所有请求从squbs client 到 Akka HTTP 主机连接池将经过pipeline
      *反之亦然,所有来自Akka HTTP主机连接池至squbs客户端的响应将经过pipeline。

流pipeline描述

默认的pre/post flow通过下面的配置定义,将自动建立客户端/服务端两端之间的pipeline,除非服务端/客户端各自的defaultPipeline配置设置为off

squbs.pipeline.streaming.defaults {
    pre-flow = defaultPreFlow
    post-flow = defaultPostFlow
}

service声明pipeline

squbs-meta.conf, 你可以为一个服务指定一个pipeline:

squbs-services = [
  {
    class-name = org.squbs.sample.MyActor
    web-context = mypath
    pipeline = dummyflow
  }
]

如果这里没有针对 squbs-service个性化的pipeline,主需要忽略。

通过以上配置,pipeline看起来如下:

                 +---------+   +---------+   +---------+   +---------+
RequestContext ~>|         |~> |         |~> |         |~> |         | 
                 | default |   |  dummy  |   | default |   |  squbs  |
                 | PreFlow |   |  flow   |   | PostFlow|   | service | 
RequestContext <~|         |<~ |         |<~ |         |<~ |         |
                 +---------+   +---------+   +---------+   +---------+

RequestContext 基本上是一个 HttpRequestHttpResponse的包装,还允许携带上下文信息。

客户端pipeline的声明

application.conf中,你可以为一个客户端指定一个pipeline:

sample {
  type = squbs.httpclient
  pipeline = dummyFlow
}

如果没有为squbs-client提供自定义的pipeline ,只需要忽略。

通过以上配置,pipeline看起来如下:

                 +---------+   +---------+   +---------+   +----------+
RequestContext ~>|         |~> |         |~> |         |~> |   Host   | 
                 | default |   |  dummy  |   | default |   |Connection|
                 | PreFlow |   |  flow   |   | PostFlow|   |   Pool   | 
RequestContext <~|         |<~ |         |<~ |         |<~ |   Flow   |
                 +---------+   +---------+   +---------+   +----------+

Bidi Flow配置

一个双向流可以配置如下:

dummyflow {
  type = squbs.pipelineflow
  factory = org.squbs.sample.DummyBidiFlow
}
  • type: 确定配置的身份为 squbs.pipelineflow
  • factory:创建BidiFlow的工厂类

DummyBidiFlow的一个例子:

class DummyBidiFlow extends PipelineFlowFactory {

  override def create(implicit system: ActorSystem): PipelineFlow = {
     BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
      val inbound = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("DummyRequest", "ReqValue")) })
      val outbound = b.add(Flow[RequestContext].map{ rc => rc.addResponseHeader(RawHeader("DummyResponse", "ResValue"))})
      BidiShape.fromFlows(inbound, outbound)
    })
  }
}

中止flow

在一些具体场景下,一个pipeline中的stage可能需要中止flow并返回一个HttpResponse,例如,在授权/认证情况下。在这些场景下,应当跳过管道的其余部分,请求不应该抵达squbs服务。来跳过flow的剩余部分:

  • flow需要在构造时加入abortable,例如b.add(authorization abortable)
  • 当你需要中止时,通过HttpResponseRequestContext 上调用abortWith

在下面DummyAbortableBidiFlow例子,authorizationabortable 之间是一个bidi flow 。当用户认证未通过时,它中止flow:

class DummyAbortableBidiFlow extends PipelineFlowFactory {

  override def create(implicit system: ActorSystem): PipelineFlow = {

    BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._
      val inboundA = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInA", "valInA")) })
      val inboundC = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInC", "valInC")) })
      val outboundA = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutA", "valOutA"))})
      val outboundC = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutC", "valOutC"))})

      val inboundOutboundB = b.add(authorization abortable)

      inboundA ~>  inboundOutboundB.in1
                   inboundOutboundB.out1 ~> inboundC
                   inboundOutboundB.in2  <~ outboundC
      outboundA <~ inboundOutboundB.out2

      BidiShape(inboundA.in, inboundC.out, outboundC.in, outboundA.out)
    })
  }

  val authorization = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>

    val authorization = b.add(Flow[RequestContext] map { rc =>
        if(!isAuthorized) rc.abortWith(HttpResponse(StatusCodes.Unauthorized, entity = "Not Authorized!"))
        else rc
    })

    val noneFlow = b.add(Flow[RequestContext]) // Do nothing

    BidiShape.fromFlows(authorization, noneFlow)
  })
}

一旦flow加入 abortable, bidi flow被连接。bidi flow检查HttpResponse的存在,绕过和发送下游请求。这就是上面的DummyAbortableBidiFlow看起来:

                                                +-----------------------------------+
                                                |  +-----------+    +-----------+   |   +-----------+
                  +-----------+   +---------+   |  |           | ~> |  filter   o~~~0 ~>|           |
                  |           |   |         |   |  |           |    |not aborted|   |   | inboundC  | ~> RequestContext
RequestContext ~> | inboundA  |~> |         |~> 0~~o broadcast |    +-----------+   |   |           |
                  |           |   |         |   |  |           |                    |   +-----------+
                  +-----------+   |         |   |  |           | ~> +-----------+   |
                                  | inbound |   |  +-----------+    |  filter   |   |
                                  | outbound|   |                   |  aborted  |   |
                  +-----------+   |   B     |   |  +-----------+ <~ +-----------+   |   +-----------+
                  |           |   |         |   |  |           |                    |   |           |
RequestContext <~ | outboundA | <~|         | <~0~~o   merge   |                    |   | outboundC | <~ RequestContext
                  |           |   |         |   |  |           o~~~~~~~~~~~~~~~~~~~~0 <~|           |
                  +-----------+   +---------+   |  +-----------+                    |   +-----------+
                                                +-----------------------------------+

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容