Akka系列(十):Akka集群之Akka Cluster

上一篇文章我们讲了Akka Remote,理解了Akka中的远程通信,其实Akka Cluster可以看成Akka Remote的扩展,由原来的两点变成由多点组成的通信网络,这种模式相信大家都很了解,就是集群,它的优势主要有两点:系统伸缩性高,容错性更好。

集群概念

很多人很容易把分布式和集群的概念搞错,包括我也是,我一开始也以为它们两个是一样的概念,只是叫法不同而已,但其实不然,虽然它们在实际场景中都是部署在不同的机器上,但它们所提供的功能并不是一样的。举个简单的例子来看看它们之间的不同:

为了保持整个系列连续性,我又以抽奖为基础举一个例子:

假定我们现在抽奖流程包括,抽奖分配奖品和用户根据链接领取指定奖品,用户先抽奖然后获取奖品链接,点击链接填写相应信息领取奖品。

1.分布式:

我们现在把抽奖分配奖品和用户根据链接领取指定奖品分别部署在两台机器上,突然有一天很不幸,抽奖活动进行到一半,抽奖分配奖品那台机子所在的区域停电了,很显然,后续的用户参与抽奖就不能进行了,因为我们只有一台抽奖分配奖品的机子,但由于我们将领取奖品的业务部署在另一台机器上,所以前面那些中奖的用户还是可以正常的领取奖品。

2.集群:

现在我们还是有两台机器,但是我们在两个机器上都部署了抽奖分配奖品和用户根据链接领取指定奖品的业务逻辑,突然有一天,有一台所在的区域停电了,但这时我们并担心,因为另一台服务器还是可以正常的运行处理用户的所有请求。

它们的各自特点:

  • 分布式:将一个业务分离成各个子业务模块部署在不同服务器上,每个服务器完成的功能不一样;
  • 集群: 将同一个业务部署在不同的服务器上,每个服务器的功能相同;

总的来说: 分布式是以分离任务缩短时间来提高效率,而集群是在单位时间内处理更多的任务来提高效率。

Akka Cluster

在前面的文章Akka Actor的工作方式,我们可以将一个任务分解成一个个小任务,然后分配给它的子Actor执行,其实这就可以看成一个小的分布式系统,那么在Akka中,集群又是一种怎样的概念呢?

其实往简单里说,就是一些相同的ActorSystem的组合,它们具有着相同的功能,我们需要执行的任务可以随机的分配到目前可用的ActorSystem上,这点跟Nginx的负载均衡很类似,根据算法和配置将请求转发给运行正常的服务器去,Akka集群的表现形式也是这样,当然它背后的理论基础是基于gossip协议的,目前很多分布式的数据库的数据同步都采用这个协议,有兴趣的同学可以自己去研究研究,只是我也是一知半解,这里就不写了,怕误导了大家。

下面我来讲讲Akka Cluster中比较重要的几个概念:

Seed Nodes

Seed Nodes可以看过是种子节点或者原始节点,它的一个主要作用用于可以自动接收新加入集群的节点的信息,并与之通信,使用方式可以用配置文件或者运行时指定,推荐使用配置文件方式,比如:

akka.cluster.seed-nodes = [
  "akka.tcp://ClusterSystem@host1:2552",
  "akka.tcp://ClusterSystem@host2:2552"]

seed-nodes列表中的第一个节点会集群启动的时候初始化,而其他节点则是在有需要时再初始化。

当然你也可以不指定seed nodes,但你可以需要手动或者在程序中写相关逻辑让相应的节点加入集群,具体使用方式可参考官方文档。

Cluster Events

Cluster Events字面意思是集群事件,那么这是什么意思呢?其实它代表着是一个节点的各种状态和操作,举个例子,假设你在打一局王者5v5的游戏,那么你可以把十个人看成一个集群,我们每个人都是一个节点,我们的任何操作和状态都能被整个系统捕获到,比如A杀了B、A超神了,A离开了游戏,A重新连接了游戏等等,这些状态和操作在Cluster Events中就相当于节点之于集群,那么它具体是怎么使用的呢?

首先我们必须将节点注册到集群中,或者说节点订阅了某个集群,我们可以这么做:

cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])

具体代码相关的使用我会再下面写一个demo例子,来说明是如何具体使用它们的。

从上面的代码我们可以看到有一个MemberEvent的概念,这个其实就是每个成员所可能拥有的events,那么一个成员在它的生命周期中有以下的events

  • ClusterEvent.MemberJoined - 新的节点加入集群,此时的状态是Joining;
  • ClusterEvent.MemberUp - 新的节点加入集群,此时的状态是Up;
  • ClusterEvent.MemberExited - 节点正在离开集群,此时的状态是Exiting;
  • ClusterEvent.MemberRemoved - 节点已经离开集群,此时的状态是Removed;
  • ClusterEvent.UnreachableMember - 节点被标记为不可触达;
  • ClusterEvent.ReachableMember - 节点被标记为可触达;

状态说明:

  • Joining: 加入集群的瞬间状态
  • Up: 正常服务状态
  • Leaving / Exiting: 正常移出中状态
  • Down: 被标记为停机(不再是集群决策的一部分)
  • Removed: 已从集群中移除

Roles

虽然上面说到集群中的各个节点的功能是一样的,其实并不一定,比如我们将分布式和集群融合到一起,集群中的一部分节点负责接收请求,一部分用于计算,一部分用于数据存储等等,所以Akka Cluster提供了一种Roles的概念,用来表示该节点的功能特性,我们可以在配置文件中指定,比如:

akka.cluster.roles = request
akka.cluster.roles = compute
akka.cluster.roles = store

ClusterClient

ClusterClient是一个集群客户端,主要用于集群外部系统与集群通信,使用它非常方便,我们只需要将集群中的任意指定一个节点作为集群客户端,然后将其注册为一个该集群的接待员,最后我们就可以在外部系统直接与之通信了,使用ClusterClient需要做相应的配置:

akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]

假设我们现在我一个接待的Actor,叫做frontend,我们就可以这样做:

val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
ClusterClientReceptionist(system).registerService(frontend)

Akka Cluster例子

上面讲了集群概念和Akka Cluster中相对重要的概念,下面我们就来写一个Akka Cluster的demo,

demo需求:

线假设需要执行一些相同任务,频率为2s一个,现在我们需要将这些任务分配给Akka集群中的不同节点去执行,这里使用ClusterClient作为集群与外部的通信接口。

首先我们先来定义一些命令:


package sample.cluster.transformation

final case class TransformationJob(text: String) // 任务内容
final case class TransformationResult(text: String) // 执行任务结果
final case class JobFailed(reason: String, job: TransformationJob) //任务失败相应原因
case object BackendRegistration // 后台具体执行任务节点注册事件

然后我们实现具体执行任务逻辑的后台节点:


class TransformationBackend extends Actor {

  val cluster = Cluster(context.system)

  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent])  //在启动Actor时将该节点订阅到集群中
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case TransformationJob(text) => { // 接收任务请求
      val result = text.toUpperCase // 任务执行得到结果(将字符串转换为大写)
      sender() ! TransformationResult(text.toUpperCase) // 向发送者返回结果
    }
    case state: CurrentClusterState =>
      state.members.filter(_.status == MemberStatus.Up) foreach register // 根据节点状态向集群客户端注册
    case MemberUp(m) => register(m)  // 将刚处于Up状态的节点向集群客户端注册
  }

  def register(member: Member): Unit = {   //将节点注册到集群客户端
    context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
      BackendRegistration
  }
}

相应节点的配置文件信息,我这里就不贴了,请从相应的源码demo里获取。源码链接

接着我们来实现集群客户端:


class TransformationFrontend extends Actor {

  var backends = IndexedSeq.empty[ActorRef] //任务后台节点列表
  var jobCounter = 0

  def receive = {
    case job: TransformationJob if backends.isEmpty =>  //目前暂无执行任务节点可用
      sender() ! JobFailed("Service unavailable, try again later", job)

    case job: TransformationJob => //执行相应任务
      jobCounter += 1
      implicit val timeout = Timeout(5 seconds)
      val backend = backends(jobCounter % backends.size) //根据相应算法选择执行任务的节点
      println(s"the backend is ${backend} and the job is ${job}")
      val result  = (backend ? job)
        .map(x => x.asInstanceOf[TransformationResult])  // 后台节点处理得到结果
      result pipeTo sender  //向外部系统发送执行结果

    case BackendRegistration if !backends.contains(sender()) =>  // 添加新的后台任务节点
      context watch sender() //监控相应的任务节点
      backends = backends :+ sender()

    case Terminated(a) =>
      backends = backends.filterNot(_ == a)  // 移除已经终止运行的节点
  }
}

最后我们实现与集群客户端交互的逻辑:

class ClientJobTransformationSendingActor extends Actor {

  val initialContacts = Set(
    ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist"))
  val settings = ClusterClientSettings(context.system)
    .withInitialContacts(initialContacts)

  val c = context.system.actorOf(ClusterClient.props(settings), "demo-client")


  def receive = {
    case TransformationResult(result) => {
      println(s"Client response and the result is ${result}")
    }
    case Send(counter) => {
        val job = TransformationJob("hello-" + counter)
        implicit val timeout = Timeout(5 seconds)
        val result = Patterns.ask(c,ClusterClient.Send("/user/frontend", job, localAffinity = true), timeout)
        result.onComplete {
          case Success(transformationResult) => {
            self ! transformationResult
          }
          case Failure(t) => println("An error has occured: " + t.getMessage)
        }
      }
  }
}

下面我们开始运行这个domo:

object DemoClient {
  def main(args : Array[String]) {

    TransformationFrontendApp.main(Seq("2551").toArray)  //启动集群客户端
    TransformationBackendApp.main(Seq("8001").toArray)   //启动三个后台节点
    TransformationBackendApp.main(Seq("8002").toArray)
    TransformationBackendApp.main(Seq("8003").toArray)

    val system = ActorSystem("OTHERSYSTEM")
    val clientJobTransformationSendingActor =
      system.actorOf(Props[ClientJobTransformationSendingActor],
        name = "clientJobTransformationSendingActor")

    val counter = new AtomicInteger
    import system.dispatcher
    system.scheduler.schedule(2.seconds, 2.seconds) {   //定时发送任务
      clientJobTransformationSendingActor ! Send(counter.incrementAndGet())
    }
    StdIn.readLine()
    system.terminate()
  }
}

运行结果:

akka-cluster.png

从结果可以看到,我们将任务根据算法分配给不同的后台节点进行执行,最终返回结果。

本文目的

  • 掌握集群基本概念
  • 了解学习Akka cluster的工作方式和主要角色
  • 尝试自己写一个Akka cluster的相关例子
  • 下一步进阶了解Akka cluster的背后原理

本文的demo例子已上传github:源码链接

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353

推荐阅读更多精彩内容