Akka HTTP之服务器发送的事件支持

服务器发送的事件(SSE)是一个用于将通知从 HTTP 服务器推送到客户端轻量级、标准化协议。与提供双向通信的 WebSocket 相比, SSE 只允许从服务器到客户端的单向通信。如果这是你所需要的, SSE 的优点是要简单得多, 只能依赖于 HTTP, 并提供浏览器中断的连接的重试语义。

根据 SSE 规范, 客户端可以通过 HTTP 从服务器请求事件流。服务器使用具有固定字符编码 UTF-8 的媒体类型text/event-stream进行响应, 并保持响应打开, 以便在可用时将事件发送到客户端。事件是文本结构, 它持有字段并以空行终止, 例如

data: { "username": "John Doe" }
event: added
id: 42

data: another event

重新连接后,客户端可以选择发送Last-Event-ID(标识最后一个已接受事件)头部给服务器。

模型

Akka HTTP 将事件流表示为Source[ServerSentEvent, NotUsed], 其中 ServerSentEvent 是具有以下只读属性的样例类:

  • data: String – 实际有效载荷, 可能跨越多行
  • eventType: Option[String] – 可选限定符, 例如. “added”, “removed”, 等等.
  • id: Option[String] – 可选标识符
  • retry: Option[Int] – 可选的重新连接延迟 (毫秒)

根据 SSE 规范Akka HTTP 还提供了Last-Event-ID头部和text/event-stream媒体类型。

服务器端用法: 编组

为了响应带有事件流的 HTTP 请求, 必须将 EventStreamMarshalling 定义的 ToResponseMarshaller[Source[ServerSentEvent, Any]] 隐式引入到各自路由定义的范围中:

import akka.NotUsed
import akka.stream.scaladsl.Source

import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._

import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME

def route: Route = {
  import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

  path("events") {
    get {
      complete {
        Source
          .tick(2.seconds, 2.seconds, NotUsed)
          .map(_ => LocalTime.now())
          .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
          .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      }
    }
  }
}

客户端用法:解组

为了解组作为Source[ServerSentEvent, NotUsed]的事件流, 必须将 EventStreamUnmarshalling 定义的 FromEntityUnmarshaller[Source[ServerSentEvent, NotUsed]] 隐式引入到范围中:

import akka.NotUsed
import akka.stream.scaladsl.Source

import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._

import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME

def route: Route = {
  import akka.http.scaladsl.marshalling.sse.EventStreamUnmarshalling._

  path("events") {
    get {
      complete {
        Source
          .tick(2.seconds, 2.seconds, NotUsed)
          .map(_ => LocalTime.now())
          .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
          .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      }
    }
  }
}

请注意, 如果您正在寻找一种能够永久订阅事件流的弹性方法, Alpakka 提供的 EventSource 连接器可以使用上次收到的事件 id 自动重新连接。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,463评论 19 139
  • SSE概述 传统的网页都是浏览器向服务器“查询”数据,但是很多场合,最有效的方式是服务器向浏览器“发送”数据。这要...
    wavesnow阅读 9,282评论 3 5
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 13,871评论 6 13
  • 外面飘起了雨滴 看这时间也只能各奔东西 习惯了并肩而行 因为你有迷人的魅力 雨水模糊了眼镜 习惯了孤单落寂 习惯了...
    小糊童Mr阅读 1,430评论 0 0
  • 一大清早,秦奎安就带雷杰回到了雷厉的家中,面对着满布鸡血没有清理的场景,再加上秦奎安昨晚的那一番话,让雷杰更加的害...
    GARBIE阅读 1,482评论 2 1