Akka Stream之流中的错误处理

当流中的某个阶段失败时, 通常会导致整个流被拆掉。此时,每个阶段的下游得到关于失败通知和上游得到关于取消通知。

在许多情况下, 您可能希望避免完全的流失败, 这可以通过几种不同的方法完成:

  • recover发出最终的元素, 然后在上游故障上正常完成流
  • recoverWithRetries创建一个新的上游并从失败开始处理
  • 在后退后重新启动流的部分
  • 对支持监督策略的阶段使用监督策略

除了这些内置的用于错误处理的工具之外, 一个常见的模式是将流包装到一个actor中, 并让actor在失败时重新启动整个流。

Recover

recover允许你注入一个最终元素,然后在上游失败时完成流。通过一个偏函数,决定哪些异常这样恢复。如果有个异常不匹配,流将失败。

如果您希望在失败时优雅地完成流, 而让下游知道出现了故障, 则recover可能很有用。

Source(0 to 6).map(n =>
  if (n < 5) n.toString
  else throw new RuntimeException("Boom!")).recover {
  case _: RuntimeException => "stream truncated"
}.runForeach(println)

则输出可能是:

0
1
2
3
4
stream truncated

recoverWithRetries

recoverWithRetries 允许你在失败的地方放入一个新的上游,在失败到指定的最大次数后恢复流。

通过一个偏函数,决定哪些异常这样恢复。如果有个异常不匹配,流将失败。

val planB = Source(List("five", "six", "seven", "eight"))

Source(0 to 10).map(n =>
  if (n < 5) n.toString
  else throw new RuntimeException("Boom!")).recoverWithRetries(attempts = 1, {
  case _: RuntimeException => planB
}).runForeach(println)

输出将是

0
1
2
3
4
five
six
seven
eight

正如Akka为actor提供回退监督模式一样, Akka stream也提供了一个RestartSourceRestartSinkRestartFlow, 用于实施所谓指数回退监控策略, 在某个阶段失败时再次启动它, 每次重新启动的延迟时间越来越长。

当某个阶段因为外部资源是否可用而失败或完成时,而且需要一些时间重新启动,这种模式有用。一个主要的例子是当一个WebSocket连接因为HTTP服务器运行正在下降(可能因为超负荷)而失败时。通过使用指数回退,避免进行紧密的重新连接,这样既可以让HTTP服务器恢复一段时间,又避免在客户端使用不必要的资源。

以下代码段显示了如何使用akka.stream.scaladsl.RestartSource创建一个回退监管,它将监督给定的Source。本例中,Source是一个服务器发送事件(SSE),由akka-http提供。如果此处流失败,将再次发送请求,以3,6,12,24和最终30秒的间隔增加(此处,由于 maxBackoff 参数,它将保持上限)。

val restartSource = RestartSource.withBackoff(
  minBackoff = 3.seconds,
  maxBackoff = 30.seconds,
  randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
) { () =>
  // Create a source from a future of a source
  Source.fromFutureSource {
    // Make a single request with akka-http
    Http().singleRequest(HttpRequest(
      uri = "http://example.com/eventstream"))
      // Unmarshall it as a source of server sent events
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
  }
}

强烈建议使用 randomFactor 为回退间隔添加一点额外的方差, 以避免在完全相同的时间点重新启动多个流, 例如, 因为它们由于共享资源 (如相同的服务器下线,并在相同间隔后重启) 而停止。通过在重新启动间隔中增加额外的随机性, 这些流将在时间上稍有不同的点开始, 从而避免大量的通信量冲击恢复的服务器或他们都需要联系的其他资源。

上述 RestartSource 将永远不会终止, 除非Sink被送入取消。将它与 KillSwitch 结合使用通常会很方便, 以便在需要时可以终止它:

val killSwitch = restartSource
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(Sink.foreach(event => println(s"Got event: $event")))(Keep.left)
  .run()

doSomethingElse()

killSwitch.shutdown()

Sink和flow也可以被监管,使用akka.stream.scaladsl.RestartSinkakka.stream.scaladsl.RestartFlow。RestartSink 在取消时重新启动, 而在输入端口取消、输出端口完成或输出端口发送错误时重新启动 RestartFlow。

监管策略

注意
支持监管策略的各个阶段都有明文规定, 如果一个阶段的文档中没有说明它遵守监管策略, 就意味着它失败, 而不是采用监管。

错误处理策略受actor监管策略的启发, 但语义已经适应了流处理的领域。最重要的区别是, 监管不是自动应用到流阶段, 而是每个阶段必须显式实现的东西。

在许多阶段, 实现对监管策略的支持可能甚至没有意义, 对于连接到外部技术的阶段尤其如此, 例如, 失败的连接如果立即尝试新连接, 可能仍然会失败。

对于实现监管的阶段, 在通过使用属性物化流时, 可以选择处理流元素的异常处理策略。

有三种方法可以处理应用程序代码中的异常:

  • Stop - 流以失败完成。
  • Resume - 元素被丢弃,流继续执行
  • Restart - 元素被丢弃,且流在重启该阶段后继续执行。重新启动阶段意味着任何累积状态都被清除。 这通常通过创建阶段的新实例来执行。

默认情况下, 停止策略用于所有异常, 即在抛出异常时, 流将以失败完成。

implicit val materializer = ActorMaterializer()
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// division by zero will fail the stream and the
// result here will be a Future completed with Failure(ArithmeticException)

可以在materializer的设置中定义流的默认监管策略。

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _ => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
  ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)

在这里你可以看到, 所有的 ArithmeticException 将恢复处理, 即导致除以零的元素被丢弃了。

注意
请注意, 丢弃元素可能会导致具有循环的图中出现死锁。

还可以为flow的所有操作定义监管策略。

implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _ => Supervision.Stop
}
val flow = Flow[Int]
  .filter(100 / _ < 50).map(elem => 100 / (5 - elem))
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(0 to 5).via(flow)

val result = source.runWith(Sink.fold(0)(_ + _))
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)

重新启动的工作方式与恢复类似,除了故障处理阶段的累加状态(如果有的话)将被重置。

implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
  case _: IllegalArgumentException => Supervision.Restart
  case _ => Supervision.Stop
}
val flow = Flow[Int]
  .scan(0) { (acc, elem) =>
    if (elem < 0) throw new IllegalArgumentException("negative not allowed")
    else acc + elem
  }
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(List(1, 3, -1, 5, 7)).via(flow)
val result = source.limit(1000).runWith(Sink.seq)
// 负数元素导致scan阶段重启
// 即再次从0开始
// 结果将是以Success(Vector(0, 1, 4, 0, 5, 12))完成的Future 

来自mapAsync错误

流监管也可以应用于mapAsyncmapAsyncUnordered的future,即使这些错误发生于future而不是在阶段自身。

假设我们使用外部服务来查找电子邮件地址,我们希望丢弃那些无法找到的地址。

我们开始于推文的作者流:

val authors: Source[Author, NotUsed] =
  tweets
    .filter(_.hashtags.contains(akkaTag))
    .map(_.author)

假设我们可以使用以下方式查找其电子邮件地址:

def lookupEmail(handle: String): Future[String] =

当电子邮件没有找到时,FutureFailure完成。

通过使用lookupEmail服务以及使用mapAsync, 可以将作者流转换为电子邮件地址流, 并使用Supervision.resumingDecider丢弃未知电子邮件地址:

import ActorAttributes.supervisionStrategy
import Supervision.resumingDecider

val emailAddresses: Source[String, NotUsed] =
  authors.via(
    Flow[Author].mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
      .withAttributes(supervisionStrategy(resumingDecider)))

如果不使用Resume而是默认的停止策略,那么流将在第一个带有Failure完成的Future时,以失败完成流。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容

  • 1 基本流处理 让我们首先看看使用akka-stream处理流的真正含义。图1展示了在某个处理节点上,元素是一个个...
    乐言笔记阅读 2,651评论 1 1
  • (1)viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], M...
    乐言笔记阅读 2,394评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,650评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,050评论 25 707
  • 《道德经》有云,上善若水,水善利万物而不争,处众人之所恶,故几于道也。 道为何物,老子又说道可道,非常道,名可名,...
    lin秀阅读 1,024评论 0 0