本部分内容都需要引用:
import akka.actor.{ Actor, ActorRef, Props }
import akka.io.{ IO, Tcp }
import akka.util.ByteString
import java.net.InetSocketAddress
所有的 Akka I/O API都需要通过manager访问,当使用 I/O API时,第一步就是对manager的引用。
import akka.io.{ IO, Tcp }
import context.system // implicitly used by IO(Tcp)
val manager = IO(Tcp)
manager是处理底层I/O资源(选择器、通道)并为特定任务(如监听传入连接)实例化worker的actor。
连接(client side)
object Client {
def props(remote: InetSocketAddress, replies: ActorRef) =
Props(classOf[Client], remote, replies)
}
class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor {
import Tcp._
import context.system
//连接远程的第一步给TCP manag发送Connect
IO(Tcp) ! Connect(remote)
def receive = {
//Tcp manager会返回 两种消息
case CommandFailed(_: Connect) ⇒
listener ! "connect failed"
context stop self
case c @ Connected(remote, local) ⇒
listener ! c
//激活连接,给connection actor 发送注册消息。来通知谁该接收socket数据
val connection = sender()
connection ! Register(self)
context become {
case data: ByteString ⇒
connection ! Write(data)
case CommandFailed(w: Write) ⇒
// O/S buffer was full
listener ! "write failed"
case Received(data) ⇒
listener ! data
case "close" ⇒
connection ! Close
case _: ConnectionClosed ⇒
listener ! "connection closed"
context stop self
}
}
}
连接远程地址的第一步就是给TCP manager 发送一个Connect
消息。
然后TCP manager 会回复一个 CommandFailed
消息或者产生一个内部actor来表示新的connection 。这个 connection actor会发送一个Connected
消息给Client。
为了激活新连接,需要发送给 connection actor 注册消息来通知谁该接收socket中的数据。并且要等注册消息传递过程完成以后连接才可以使用。而且这段过程如果超时connection actor 会自己终止。
connection actor一直监视注册处理程序,如果它停止, connection actor会关闭连接,清理所有的连接相关的资源。
client 使用become 方法演示从未连接到连接状态中观察到的命令和事件。
接受连接(server side)
class Server extends Actor {
import Tcp._
import context.system
IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))
def receive = {
case b @ Bound(localAddress) ⇒
context.parent ! b
case CommandFailed(_: Bind) ⇒ context stop self
case c @ Connected(remote, local) ⇒
val handler = context.actorOf(Props[SimplisticHandler])
val connection = sender()
connection ! Register(handler)
}
}
要创建TCP服务器并监听入站连接,必须将Bind
命令发送给TCP manager。TCP manager监听特定InetSocketAddress
上的TCP连接,端口为0表示连接到随机端口。
发送bind
消息的actor将收到bound
消息,表明服务器已经准备好接受传入的连接。bound
消息里包含InetSocketAddress
(解析IP地址和端口)
处理连接内容 与外部连接过程一样。将需要读取的内容交给一个处理程序(SimplisticHandler)完成操作。
class SimplisticHandler extends Actor {
import Tcp._
def receive = {
case Received(data) ⇒ sender() ! Write(data)
case PeerClosed ⇒ context stop self
}
}
关闭连接
Close
、ConfirmedClose
、Abort
这是三个命令发送给connection actor 都可以关闭连接。
-
Close
是通过FIN
消息关闭连接,不用等待远程端点确认消息。listener会收到Closed
-
ConfirmedClose
也是通过FIN
消息关闭连接,但是发送FIN
消息后仍然持续运行,知道收到远程的确认消息。listener会收到ConfirmedClosed
-
Abort
将通过向远程端点发送RST
消息立即终止连接。listener会收到Aborted
如果远程端点关闭连接,会发送PeerClosed
给listener。为了支持半关闭连接,将注册消息的keepOpenOnPeerClosed
成员设置为true,在这种情况下,连接保持打开状态,直到接收到上述关闭命令之一为止。
当发生错误导致连接被关闭时,ErrorClosed
将被发送给侦听器。
Write to a connection
一旦连接建立,任何actor通过Tcp.WriteCommand
形式发送数据,Tcp.WriteCommand
只是个抽象类,有三种具体的实现方法。
-
Tcp.Write
最简单的WriteCommand实现,它封装了一个ByteString实例和一个“ack”事件。 -
Tcp.WriteFile
高效地发送文件中的“原始”数据。 -
Tcp.CompoundWrite
将几个Tcp.Write
和(或)Tcp.WriteFile
放进一个命令中。优点如下:
1.TCP连接actor一次只能处理一个write命令。通过将几个Write放到一个CompoundWrite中,可以让它们在最小开销的情况下通过连接发送,而无需通过基于ack-base
的消息协议将它们发送给连接actor。
2.因为WriteCommand
是原子写入,所以当结合进CompoundWrite后,其他actor不能注入其他写入。
3.因为CompoundWrite的子写入是Write和WriteFile,它们可以请求Ack消息。当这些子写入完成后发送Ack。通过connection actor在任意点发送ack可以看到CompoundWrite的过程进展。
读写节流
Tcp connection actor 的基本模型是没有内部缓冲器的,因为一次只能处理一个写入。无论写入还是读取都需要在用户级别控制拥塞。
关于背压 写入有三种操作模式:
1. ACK-based
:每个写命令都带有一个任意对象,如果这个对象不是Tcp.NoAck
。在成功地将所有包含的数据写入套接字后,它将被返回给发送方。如果在收到此确认之前没有其他的写操作,那么由于缓冲区溢出而不会发生故障。
2.NACK-based
:当下个写入到达,但上个写入还未完成时,将回复一个包含失败写入的CommandFailed
消息。