典型使用场景
程序中含有大量有状态的actor,这些actor需消耗的资源超过单台服务器的能力。
在此场景下,actor会自动分布到不同的节点上(并且也只在一个节点上);通过ShardRegion
actor可实现消息路由, 而消息发送方无需知道actor的具体位置。
Maven依赖
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-sharding_2.12</artifactId>
<version>2.5.12</version>
</dependency>
使用方法
- 配置:可通过代码,或采用配置文件
- 在
ClusterSharding.start
方法中注册需sharding的actor,该方法会创建ShardRegion( 如果之前不存在),并返回ActorRef,如下:
import akka.japi.Option;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
Option<String> roleOption = Option.none();
ClusterShardingSettings settings = ClusterShardingSettings.create(system);
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
Props.create(Counter.class), settings, messageExtractor);
- 通过
ShardRegion.MessageExtractor
从消息中获取Stateful Actor及Shard的identifier(需自己实习逻辑)。 - shard数量应是node数量的10倍,并且sharding算法在一个集群中的所有node应该一致。
- 在Akka中已经提供了一个简单的sharding算法,
ShardRegion.HashCodeMessageExtractor
:r = A mod B,A为Stateful Actor idnentifier的hashCode
的绝对值A,B为shard数量;该算法在很多情况下可以良好工作。 - 停止ShardRegion: 发送
ShardRegion.gracefulShutdownInstance
消息
工作原理
官网解释得相当清楚:
The ShardRegion
actor is started on each node in the cluster, or group of nodes tagged with a specific role. The ShardRegion
is created with two application specific functions to extract the entity identifier and the shard identifier from incoming messages. A shard is a group of entities that will be managed together. For the first message in a specific shard the ShardRegion
requests the location of the shard from a central coordinator, the ShardCoordinator
.
The ShardCoordinator
decides which ShardRegion
shall own the Shard
and informs that ShardRegion
. The region will confirm this request and create the Shard
supervisor as a child actor. The individual Entities
will then be created when needed by the Shard
actor. Incoming messages thus travel via the ShardRegion
and the Shard
to the target Entity
.
If the shard home is another ShardRegion
instance messages will be forwarded to that ShardRegion
instance instead. While resolving the location of a shard incoming messages for that shard are buffered and later delivered when the shard home is known. Subsequent messages to the resolved shard can be delivered to the target destination immediately without involving the ShardCoordinator
.
简单来说,ShardCoordinator
决定ShardRegion
拥有哪个并创建Shard
,Shard
负责创建具体的Entities
(即Stateful Actor
);消息首先通过ShardRegion
,如果ShardRegion
已知Shard
地址,则发送消息至该Shard
(可能会通过ShardRegion
),如果不知地址,则从ShardCoordinator
获取并缓存。
ShardCoordinator
为cluster singleton,保存所有shard分配信息。
ShardCoordinator
数据模式
- 分布式数据(Distributed Data):默认模式,数据非持久化,集群停止后状态数据会丢失。
相当于:akka.cluster.sharding.state-store-mode = ddata
- 在此模式中,
Remembering Entities
的数据还是持久化,不受集群状态影响。
- 在此模式中,
- 持久化模式(Persistence Mode):需要通过配置启用
akka.cluster.sharding.state-store-mode = persistence
CusterSharding配置(文件方式)
# Settings for the ClusterShardingExtension
akka.cluster.sharding {
# The extension creates a top level actor with this name in top level system scope,
# e.g. '/system/sharding'
guardian-name = sharding
# Specifies that entities runs on cluster nodes with a specific role.
# If the role is not specified (or empty) all nodes in the cluster are used.
role = ""
# When this is set to 'on' the active entity actors will automatically be restarted
# upon Shard restart. i.e. if the Shard is started on a different ShardRegion
# due to rebalance or crash.
remember-entities = off
# If the coordinator can't store state changes it will be stopped
# and started again after this duration, with an exponential back-off
# of up to 5 times this duration.
coordinator-failure-backoff = 5 s
# The ShardRegion retries registration and shard location requests to the
# ShardCoordinator with this interval if it does not reply.
retry-interval = 2 s
# Maximum number of messages that are buffered by a ShardRegion actor.
buffer-size = 100000
# Timeout of the shard rebalancing process.
handoff-timeout = 60 s
# Time given to a region to acknowledge it's hosting a shard.
shard-start-timeout = 10 s
# If the shard is remembering entities and can't store state changes
# will be stopped and then started again after this duration. Any messages
# sent to an affected entity may be lost in this process.
shard-failure-backoff = 10 s
# If the shard is remembering entities and an entity stops itself without
# using passivate. The entity will be restarted after this duration or when
# the next message for it is received, which ever occurs first.
entity-restart-backoff = 10 s
# Rebalance check is performed periodically with this interval.
rebalance-interval = 10 s
# Absolute path to the journal plugin configuration entity that is to be
# used for the internal persistence of ClusterSharding. If not defined
# the default journal plugin is used. Note that this is not related to
# persistence used by the entity actors.
# Only used when state-store-mode=persistence
journal-plugin-id = ""
# Absolute path to the snapshot plugin configuration entity that is to be
# used for the internal persistence of ClusterSharding. If not defined
# the default snapshot plugin is used. Note that this is not related to
# persistence used by the entity actors.
# Only used when state-store-mode=persistence
snapshot-plugin-id = ""
# Defines how the coordinator stores its state. Same is also used by the
# shards for rememberEntities.
# Valid values are "ddata" or "persistence".
state-store-mode = "ddata"
# The shard saves persistent snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
# Only used when state-store-mode=persistence
snapshot-after = 1000
# The shard deletes persistent events (messages and snapshots) after doing snapshot
# keeping this number of old persistent batches.
# Batch is of size `snapshot-after`.
# When set to 0 after snapshot is successfully done all messages with equal or lower sequence number will be deleted.
# Default value of 2 leaves last maximum 2*`snapshot-after` messages and 3 snapshots (2 old ones + fresh snapshot)
keep-nr-of-batches = 2
# Setting for the default shard allocation strategy
least-shard-allocation-strategy {
# Threshold of how large the difference between most and least number of
# allocated shards must be to begin the rebalancing.
rebalance-threshold = 10
# The number of ongoing rebalancing processes is limited to this number.
max-simultaneous-rebalance = 3
}
# Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened)
# Only used when state-store-mode=ddata
waiting-for-state-timeout = 5 s
# Timeout of waiting for update the distributed state (update will be retried if the timeout happened)
# Only used when state-store-mode=ddata
updating-state-timeout = 5 s
# The shard uses this strategy to determines how to recover the underlying entity actors. The strategy is only used
# by the persistent shard when rebalancing or restarting. The value can either be "all" or "constant". The "all"
# strategy start all the underlying entity actors at the same time. The constant strategy will start the underlying
# entity actors at a fix rate. The default strategy "all".
entity-recovery-strategy = "all"
# Default settings for the constant rate entity recovery strategy
entity-recovery-constant-rate-strategy {
# Sets the frequency at which a batch of entity actors is started.
frequency = 100 ms
# Sets the number of entity actors to be restart at a particular interval
number-of-entities = 5
}
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
# The "role" of the singleton configuration is not used. The singleton role will
# be the same as "akka.cluster.sharding.role".
coordinator-singleton = ${akka.cluster.singleton}
# Settings for the Distributed Data replicator.
# Same layout as akka.cluster.distributed-data.
# The "role" of the distributed-data configuration is not used. The distributed-data
# role will be the same as "akka.cluster.sharding.role".
# Note that there is one Replicator per role and it's not possible
# to have different distributed-data settings for different sharding entity types.
# Only used when state-store-mode=ddata
distributed-data = ${akka.cluster.distributed-data}
distributed-data {
# minCap parameter to MajorityWrite and MajorityRead consistency level.
majority-min-cap = 5
durable.keys = ["shard-*"]
# When using many entities with "remember entities" the Gossip message
# can become to large if including to many in same message. Limit to
# the same number as the number of ORSet per shard.
max-delta-elements = 5
}
# The id of the dispatcher to use for ClusterSharding actors.
# If not specified default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
# This dispatcher for the entity actors is defined by the user provided
# Props, i.e. this dispatcher is not used for the entity actors.
use-dispatcher = ""
}