Akka 2.5.12 Using TCP

本部分内容都需要引用:

    import akka.actor.{ Actor, ActorRef, Props }
    import akka.io.{ IO, Tcp }
    import akka.util.ByteString
    import java.net.InetSocketAddress

所有的 Akka I/O API都需要通过manager访问,当使用 I/O API时,第一步就是对manager的引用。

    import akka.io.{ IO, Tcp }
    import context.system // implicitly used by IO(Tcp)

    val manager = IO(Tcp)

manager是处理底层I/O资源(选择器、通道)并为特定任务(如监听传入连接)实例化worker的actor。

连接(client side)

    object Client {
      def props(remote: InetSocketAddress, replies: ActorRef) =
        Props(classOf[Client], remote, replies)
    }

    class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor {

      import Tcp._
      import context.system
      //连接远程的第一步给TCP manag发送Connect
      IO(Tcp) ! Connect(remote)

      def receive = {
        //Tcp manager会返回 两种消息
        case CommandFailed(_: Connect) ⇒
          listener ! "connect failed"
          context stop self

        case c @ Connected(remote, local) ⇒
          listener ! c
          //激活连接,给connection actor 发送注册消息。来通知谁该接收socket数据
          val connection = sender()
          connection ! Register(self)
          context become {
            case data: ByteString ⇒
              connection ! Write(data)
            case CommandFailed(w: Write) ⇒
              // O/S buffer was full
              listener ! "write failed"
            case Received(data) ⇒
              listener ! data
            case "close" ⇒
              connection ! Close
            case _: ConnectionClosed ⇒
              listener ! "connection closed"
              context stop self
          }
      }
    }

连接远程地址的第一步就是给TCP manager 发送一个Connect消息。
然后TCP manager 会回复一个 CommandFailed消息或者产生一个内部actor来表示新的connection 。这个 connection actor会发送一个Connected消息给Client。
为了激活新连接,需要发送给 connection actor 注册消息来通知谁该接收socket中的数据。并且要等注册消息传递过程完成以后连接才可以使用。而且这段过程如果超时connection actor 会自己终止。
connection actor一直监视注册处理程序,如果它停止, connection actor会关闭连接,清理所有的连接相关的资源。
client 使用become 方法演示从未连接到连接状态中观察到的命令和事件。

接受连接(server side)

      class Server extends Actor {

      import Tcp._
      import context.system

      IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))

      def receive = {
        case b @ Bound(localAddress) ⇒
          context.parent ! b

        case CommandFailed(_: Bind) ⇒ context stop self

        case c @ Connected(remote, local) ⇒
          val handler = context.actorOf(Props[SimplisticHandler])
          val connection = sender()
          connection ! Register(handler)
      }

    }

要创建TCP服务器并监听入站连接,必须将Bind命令发送给TCP manager。TCP manager监听特定InetSocketAddress上的TCP连接,端口为0表示连接到随机端口。

发送bind消息的actor将收到bound消息,表明服务器已经准备好接受传入的连接。bound消息里包含InetSocketAddress(解析IP地址和端口)
处理连接内容 与外部连接过程一样。将需要读取的内容交给一个处理程序(SimplisticHandler)完成操作。

    class SimplisticHandler extends Actor {
      import Tcp._
      def receive = {
        case Received(data) ⇒ sender() ! Write(data)
        case PeerClosed     ⇒ context stop self
      }
    }

关闭连接

CloseConfirmedCloseAbort 这是三个命令发送给connection actor 都可以关闭连接。

  • Close是通过 FIN消息关闭连接,不用等待远程端点确认消息。listener会收到Closed
  • ConfirmedClose也是通过 FIN消息关闭连接,但是发送FIN消息后仍然持续运行,知道收到远程的确认消息。listener会收到ConfirmedClosed
  • Abort将通过向远程端点发送RST消息立即终止连接。listener会收到Aborted

如果远程端点关闭连接,会发送PeerClosed给listener。为了支持半关闭连接,将注册消息的keepOpenOnPeerClosed成员设置为true,在这种情况下,连接保持打开状态,直到接收到上述关闭命令之一为止。

当发生错误导致连接被关闭时,ErrorClosed将被发送给侦听器。

Write to a connection

一旦连接建立,任何actor通过Tcp.WriteCommand形式发送数据,Tcp.WriteCommand只是个抽象类,有三种具体的实现方法。

  • Tcp.Write

    最简单的WriteCommand实现,它封装了一个ByteString实例和一个“ack”事件。
  • Tcp.WriteFile

    高效地发送文件中的“原始”数据。
  • Tcp.CompoundWrite

    将几个Tcp.Write和(或)Tcp.WriteFile放进一个命令中。优点如下:
    1.TCP连接actor一次只能处理一个write命令。通过将几个Write放到一个CompoundWrite中,可以让它们在最小开销的情况下通过连接发送,而无需通过基于ack-base的消息协议将它们发送给连接actor。
    2.因为WriteCommand是原子写入,所以当结合进CompoundWrite后,其他actor不能注入其他写入。
    3.因为CompoundWrite的子写入是Write和WriteFile,它们可以请求Ack消息。当这些子写入完成后发送Ack。通过connection actor在任意点发送ack可以看到CompoundWrite的过程进展。

读写节流

Tcp connection actor 的基本模型是没有内部缓冲器的,因为一次只能处理一个写入。无论写入还是读取都需要在用户级别控制拥塞。

关于背压 写入有三种操作模式:
1. ACK-based:每个写命令都带有一个任意对象,如果这个对象不是Tcp.NoAck。在成功地将所有包含的数据写入套接字后,它将被返回给发送方。如果在收到此确认之前没有其他的写操作,那么由于缓冲区溢出而不会发生故障。
2.NACK-based:当下个写入到达,但上个写入还未完成时,将回复一个包含失败写入的CommandFailed消息。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容