最近在学习 邓草原用akka实现的实时流式消息处理的架构,整合一下演讲和ppt,方便自己理解。
http://www.infoq.com/cn/presentations/akka-cluster-realization/
Actor模式,一种计算粒度,
-
处理
的能力(actor的行为) -
存贮
的能力(保存状态) -
通讯
的能力(消息的接口,mailbox)
Actor三定则(actor行为的改变围绕异步消息流驱动)当actor收到一条消息:
1.创建另外一些actor
2.向已知的actor发送消息
3.指定接收下一条消息的行为
Actor - 适合并行计算的最小的粒度 也是做增量计算的最小粒度
原因:
- 单个actor的状态和行为只由接收到的消息驱动
- 单个actor串行地处理接收到的消息
基于以上两点 单个actor总是线程安全的
- 大量的actor同时同时处在活跃状态,其行为是并行的
并行是多个actor的行为
在企业级应用、互联网应用中
Entity(实体通常是带状态的)应该是actor
是actor可以带来以下的优点
- 可以按需即时加载到内存
- 可以设定 Receive Timeout 自动从内存中卸载(一段时间没有接受消息)或通过编程逻辑主动卸载
- 应该持久化对状态产生影响的事件(消息)
- 可以持久化状态的快照(如果从存储中恢复actor是从一出生的日志开始replay很耗时,通过快照来自定义recover的过程)
- 按固定事件间隔持久化快照
- 从最近的快照恢复状态
- Entity = 状态快照 + 事件重演
- 不提倡 In-place Update 修改持久化后的状态。应该使用append增加状态的日志。
在 Akka中 actor的实现
- 每个actor是非常轻量的计算单元
5000万/秒 消息转发能力(单机,单核,本地)
250万 actor/GB 内存 (每个空actor约400字节) - Actor 位置透明,本身具备分布能力
无论本地还是远程节点,按地址创建查找
访问本地或远程节点,仅在于(Path)不同
可以跨节点迁移 - Actor是按层级实现督导(supervision)
actor按树状组织成层级
父actor监控子actor的状态,可以在出状况的时候停止、重启、恢复它 - 分片集群(Sharding Cluster)- Entity Actors
按Entity 的 ID分片,按需自动在响应的节点创建
消息按EntityID发送,由Resolver根据ID定位到Actor所在的 region 中,并由region 发送给actor - 持久化(Persistence) - 状态快照或事件重演
LevelDB(用于开发、测试)
HBase
分片 - IdExtractor / ShardResolver
根据实体的id按一定的规则分散到不同的节点中,下面是解决该问题定义的几个基本的type
type EntryId = String //entity id
type ShardId = String //region id 每个节点上有很多region
type Msg = Any //往每个entity actor 发送的消息
type IdExtractor = PartialFunction[Msg, (EntryId, Msg)] //id 抽取
type ShardResolver = Msg => ShardId //根据消息 转换成 region id
具体例子:
//把消息定义成Command
sealed trait Command extends Msg with Serializable {
def sessionId: String
}
// cluster 按 sessionId 与 actor 一一对应,按需即时创建或定位转发
lazy val idExtractor: ShardRegion.IdExtractor = {
case cmd: Command => (cmd.sessionId, cmd)
}
// cluster 依据 sessionId ,按一定规则,将 actor 分片到 Region
// 比如 100 个 regions , cluster 会在每个节点分配若干个 Regions
lazy val shardResolver: ShardRegion.ShardResolver = {
case cmd: Command =>
(math.abs(cmd.sessionId.hashCode) % 100).toString
//分片方式
}
持久化的实现 persist /recover
class ClusterConnectionActive(val namespaceMediator: ActorRef,
val broadcastMediator: ActorRef) extends
ConnectionActive with EventsourcedProcessor {
override def receiveRecover: Receive = {
case event: Event => updateState(event) // 重演持久化的消息历史以恢复状态
}
// 只持久化会改变状态的消息
override def receiveCommand: Receive = {
case connected: Connected =>
persist(connected)(updateState(_))
case packets: UpdatePackets =>
persist(packets)(updateState(_))
case _ => // 处理其它消息
}
def updateState(event: Event) = {//具体的业务逻辑
event match {
case x: Connected =>
connectionContext.foreach(_.bindTransport(x.transport))
case x: UpdatePackets =>
pendingPackets = immutable.Queue(x.packets: _*)
}
}
spray - socket .io 集群架构
- 无状态的连接层
连接层保持每台设备的持久连接 - 有状态的状态层
每台设备的状态由状态层对应每个设备的actor来保持
Resolver就是cluster sharding里的region
region根据ID来访问每个状态actor - 用于业务逻辑的事件流接口
所有事件和消息则发布到 Mediator,通过mediator分发推送事件和消息。每个节点上的mediator间是增量同步的。
故障场景
-
连接层节点宕机:
当设备重新连进来的时候会找还活跃的节点去新创一个连接(Transport actor)。并恢复和它保持状态的节点的联系(根据entityID找到对应的状态层actor绑定起来)。
-
状态层节点宕机:
当挂掉的节点被设备或连接层再次访问的时候,会选择正常的节点即时加载相应的 actor (首先建立mailbox),恢复状态是从Persistence的快照将事件重演,在恢复的过程中收到的消息会被暂存(stash)。恢复过程结束时会unstash处理暂存的消息和后续的消息。