ClusterClient是一个集群客户端,主要用于集群外部系统与集群通信,只需要知道一个集群中的节点作为初始的联络点。就可将这个节点注册为该集群的接待员(receptionist actor),通过这个接待员我们就可以在外部系统建立通信了。ClusterClient会监视与接待员的连接,如果连接故障了,它会建立一条新的连接。当查找一个新的接待员时,它会从之前的建立的节点中获取新的联络点,或者周期性的刷新联络点。所以联络点不一定是初始联络点。
当你使用集群客户端时,必须要更改akka.actor.provider,将local改为remote或cluster。
接待员在集群中所有的节点上启动。接待员可以是 ClusterReceptionist 扩展的也可以是一个普通的actor 。
集群中的actor需要在mediator中注册,然后Cluster Client就可以通过 集群中的接待员 来访问mediator的注册表中的任何actor。
在Cluster Client中也有一个集群客户端接待员ClusterClientReceptionist 将客户端能访问到的节点注册到列表中。就像mediator一样 ClusterClient 和 ClusterClientReceptionist 之间发送的事件可以订阅。 订阅发送消息的类型有三种
①.ClusterClient.Send
将消息发送至任意一节点
②.ClusterClient.SendToAll
消息将会发给集群中的所有匹配path的节点
③.ClusterClient.Publish
将消息发送给订阅了某个主题的所有actor
接待员打开专门的通道让目标actor的响应消息发送到客户端。可以避免其他集群中的节点干扰
因为客户端通常应该通过ClusterClient发送后续消息。如果客户端直接联系集群中的actor,那actor的回复可以发送到原始发送方。
消息缓冲:当向新接待员建立连接时,ClusterClient会缓冲消息,等连接建立完成后再发送消息。如果缓存满了ClusterClient会将旧的消息抛弃。可以通过配置来改变缓冲区的大小,0代表禁用。(具体配置见下文- 配置)
防止消息丢失:为了防止消息丢失客户端actor和目标actor之间也要保证 at-least-once message delivery(至少一次通信)。
一个集群客户端示例:
首先在集群中的节点启动接待员,配置扩展属性。
akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
(ClusterClientReceptionist 使用 DistributedPubSub扩展)
然后在ClusterClientReceptionist中注册客户端可用的actor
在客户端,创建ClusterClient actor 并向集群中的某个地方发送识别消息。
上面代码中initialContacts参数是Set[ActorPath],可以这样配置
事件
上文说道ClusterClient 和 ClusterClientReceptionist 之间发送的事件可以订阅,因此有一个actor来接收(contact points)可用接待员的地址。
同样地,我们还要创建一个actor来接收与ClusterClientReceptionist连接的cluster client
依赖
使用Cluster Client要添加以下依赖
"com.typesafe.akka" %% "akka-cluster-tools" % "2.5.11"
配置
配置中看到当集群客户端启动时,必须提供一个初始联系列表,该列表是接待员正在运行的集群节点。然后,它会重复地尝试联系那些直到它与其中一个的联系成功。(上文配置中establishing-get-contacts-interval)。 在运行时,通过来自Receptionist的数据联系列表会不断地更新。(上文配置中 refresh-contacts-interval )
当客户端正在运行时,它会检测到与receptionist 的连接失败,如果超过了可配置的心跳次数,客户将尝试重新连接到它已知的联系列表,以找到它可以访问的接待员。
reconnect-timeout配置时间间隔,在超过规定时间间隔中客户端找到一个receptionist 连接到集群时,cluster client将被停止。在新地址上重新启动,然后通过访问某种服务注册中心来更新一组初始联系人,从而启动一个新的集群客户端。