squbs-20. 流的生命周期

原文地址:Streams Lifecycle
Akka Streams/Reactive流需要和服务的Runtime Lifecycle 集成。为了这个,一个自动化的或半自动话的集成通过 PerpetualStream实现。为了直接对流源的细粒度控制, LifecycleManaged提供一个包装,可以控制任何源组件的可能发生的停止或关闭,以便流可以优雅的启动/关闭。

永久流(PerpetualStream)

PerpetualStream是一个特性(triat),允许声明一个可以启动的流,当服务优雅的启动和关闭,不会在服务关闭的时候丢失消息。

使用 PerpetualStream的流将符合以下要求,将允许PerpetualStream 中的钩子通过最小自定义重写数无缝的工作:

  1. killSwitch.flow作为在source之后的流的第一个阶段。killSwitch 是一个标准的Akka SharedKillSwitch,通过PerpetualStream特性(trait)提供。
  2. stream实现 FutureProduct通过它们最后的元素。 ProductTuple, List和其他的超类。Sink物化Future是很自然的。如果更多的物化值需要,它通常来自某种形式的 ProductSink成为流最后的元素,也通常物化Product的最后一个元素。
  3. Future呈现流的完结(物化值或最后的物化值)。换句话说,流结束时future完成。

如果满足以上所有要求,没有其他自定义重写用于PerpetualStream函数。下面的代码符合PerpetualStream

class WellBehavedStream extends PerpetualStream[Future[Done]] {

  def generator = Iterator.iterate(0) { p => 
    if (p == Int.MaxValue) 0 else p + 1 
  }

  val source = Source.fromIterator(generator _)

  val ignoreSink = Sink.ignore
  
  override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(ignoreSink) {
    implicit builder =>
      sink =>
        import GraphDSL.Implicits._
        source ~> killSwitch.flow[Int] ~> sink
        ClosedShape
  })
}

这就是。这个流行为良好,因为它物化 sink物化值,即Future[Done]

关闭重写

有时不可能定义良好的流。举个例子,Sink可能不会物化 Future或你需要在关闭时做更多的清理。因为这个原因,可以通过重写shutdown如下:

  override def shutdown(): Future[Done] = {
    // Do all your cleanup
    // For safety, call super
    super.shutdown()
    // The Future from super.shutdown may not mean anything.
    // Feel free to create your own future that identifies the
    // stream being done. Return your Future instead.
  }

shutdown需要使用如下:

  1. 初始化流的关闭
  2. 做其他清理
  3. 当流结束处理,返回future

注意:建议任何情况下调用 super.shutdown。调用是无害的或有其他副作用。

备用关闭机制

相比于使用killSwitchsource 可以提供一个更好方式来正确的关闭。在这种情况下,仅使用source的关闭机制和重写 shutdown来发起source的关闭。killSwitch 依然未使用。

Kill Switch 重写

如果killSwitch需要跨多流共享,你可以重写 killSwitch来反射共享实例

  override lazy val killSwitch = mySharedKillSwitch

接收消息并将其转发到流

一些流从actor消息中获取输入。虽然可能一些流配置可以物化source中的ActorRef,然而很难调用这个actor。因为PerpetualStream自身是个actor,他可以拥有一个公开的地址/路径并且转发消息至流source。这样做,我们需要重写receive 如下:

  override def receive = {
    case msg: MyStreamMessage =>
      val (sourceActorRef, _) = matValue
      sourceActorRef forward msg
  }

处理流错误

PerpetualStream默认从错误中恢复不会被流stage捕获。这个消息引起忽略异常。如果需要一个不同的行为的话,重写 decider

  override def decider: Supervision.Decider = { t => 
    log.error("Uncaught error {} from stream", t)
    t.printStackTrace()
    Restart
  }

Restart将重启有错误的stage,而不会重启stream。查看Supervision Strategies获得可能的策略。

结合一下

下面的例子尽可能重写上面讨论的内容:

class MsgReceivingStream extends PerpetualStream[(ActorRef, Future[Done])] {

  val actorSource = Source.actorPublisher[MyStreamMsg](Props[MyPublisher])
  val ignoreSink = Sink.ignore[MyStreamMsg]
  
  override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(actorSource, ignoreSink)((_, _)) {
    implicit builder =>
      (source, sink) =>
        import GraphDSL.Implicits._
        source ~> sink
        ClosedShape
  })
  
  // Just forward the message to the stream source
  override def receive = {
    case msg: MyStreamMsg =>
      val (sourceActorRef, _) = matValue
      sourceActorRef forward msg
  }
  
  override def shutdown() = {
    val (sourceActorRef, _) = matValue
    sourceActorRef ! cancelStream
    // Sink materialization conforms
    // so super.shutdown() will give the right future
    super.shutdown()
  }
}

制作一个Lifecycle-Sensitive source

如果你期望拥有一个source,完全链接squbs的生命周期的动作,你可以将source包裹 LifecycleManaged:

Scala

val inSource = <your-original-source>
val aggregatedSource = LifecycleManaged().source(inSource)

Java

final Source inSource = <your-original-source>
final Source aggregatedSource = new LifecycleManaged(system).source(inSource);

这个结果source将集成source实例化成一个(T, ActorRef)TinSource 的实例化类型, ActorRef 是 trigger actor的实例化类型(从Unicomplex接收事件,squbs的容器)

这个集成source直到生命周期状态变成Active才开始从源source中发出,并且在生命周期状态成为 Stopping之后停止发出元素和关闭流。

个性化集成Triggered Source

如果你想要你的flow启动/停用个性化的事件,你可以整合一个个性化的trigger source,元素true将会启用,元素false将会停用。

注意 Trigger有一个参数eagerComplete默认为false在scala中,而在JAVA中需要传递。如果eagerComplete设置为false,trigger source 的结束或终止将脱离这个触发。如果设置为true,这个终止会完成这个流。

Scala

import org.squbs.stream.TriggerEvent._

val inSource = <your-original-source>
val trigger = <your-custom-trigger-source>.collect {
  case 0 => DISABLE
  case 1 => ENABLE
}

val aggregatedSource = new Trigger().source(inSource, trigger)

Java

import static org.squbs.stream.TriggerEvent.DISABLE;
import static org.squbs.stream.TriggerEvent.ENABLE;

final Source<?, ?> inSource = <your-original-source>;
final Source<?, ?> trigger = <your-custom-trigger-source>.collect(new PFBuilder<Integer, TriggerEvent>()
    .match(Integer.class, p -> p == 1, p -> ENABLE)
    .match(Integer.class, p -> p == 0, p -> DISABLE)
    .build());

final Source aggregatedSource = new Trigger(false).source(inSource, trigger);

个性化的生命周期事件触发

如果你想要在ActiveStopping之外响应更多生命周期,举个例子,你想要Failed来同时关闭flow,你可以修改生命周期事件映射。

import org.squbs.stream.TriggerEvent._

val inSource = <your-original-source>
val trigger = Source.actorPublisher[LifecycleState](Props.create(classOf[UnicomplexActorPublisher]))
  .collect {
    case Active => ENABLE
    case Stopping | Failed => DISABLE
  }

val aggregatedSource = new Trigger().source(inSource, trigger)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容