分布式事件
如果想感知某些事件,可以向Hazelcast实例注册监听器。监听器在集群范围内有效,向集群内的一个成员注册监听器,实际上是向集群内所有的成员注册了监听器。新加入成员产生的事件也会被发送给监听器。Hazelcast只生成注册了监听器的事件。如果没有监听器,也就不会有事件产生,为什么要去做没人关心的事情呢?如果在注册监听器时提供了谓词,事件在发送给监听器时首先要通过谓词,只有通过谓词的事件才能最终发送给监听器。
根据经验,不宜在监听器内实现过重的处理逻辑(会阻塞线程影响事件的处理)。如若真的需要复杂的处理逻辑,可以使用ExecutorService
异步执行。
注意: 事件并不是高可用的,在故障场景有丢失的风险。但是有一些变通的手段,比如配置容量等可以用来降低事件丢失的可能性。
Hazelcast 提供以下事件监听器。
集群事件:
- Membership Listener
- Distributed Object Listener
- Migration Listener
- Partition Lost Listener
- Lifecycle Listener
- Client Listener
分布式对象事件:
- Entry Listener
- Item Listener
- Message Listener
JCache :
- Cache Entry Listener
- ICache Partition Lost Listener
Hazelcast 客户端:
- Lifecycle Listener
- Membership Listener
- Distributed Object Listener
8.1. 集群事件
8.1.1. 监听成员事件
下面的事件会触发成员关系监听器的接口方法被调用:
memberAdded
memberRemoved
memberAttributeChanged
实现一个成员关系监听器需要实现 MembershipListener
接口。
class ClusterMembershipListener : MembershipListener {
override fun memberRemoved(event: MembershipEvent?) {
println(event?.member.toString())
}
override fun memberAdded(event: MembershipEvent?) {
println(event?.member.toString())
}
override fun memberAttributeChanged(event: MemberAttributeEvent?) {
println(event?.member.toString())
}
}
注册监听器
实现监听器类后可以使用 addMembershipListener
方法向Hazelcast注册监听器:
val instance = Hazelcast.newHazelcastInstance();
instance.cluster.addMembershipListener(ClusterMembershipListener())
使用上面的方法配置监听器有一点不足:创建实例和注册监听之间的事件可能丢失。为了解决这个问题,Hazelcast支持在配置中配置监听器。
代码配置:
val config = Config()
config.addListenerConfig(ListenerConfig("io.github.ctlove0523.hazelcast.data.ClusterMembershipListener"))
val instance = Hazelcast.newHazelcastInstance(config)
声明式配置:
<hazelcast>
...
<listeners>
<listener>
io.github.ctlove0523.hazelcast.data.ClusterMembershipListener
</listener>
</listeners>
...
</hazelcast>
Spring配置:
<hz:listeners>
<hz:listener class-name="io.github.ctlove0523.hazelcast.data.ClusterMembershipListener"/>
<hz:listener implementation="MembershipListener"/>
</hz:listeners>
8.1.2. 监听分布式对象事件
当集群内一个分布式对象创建和销毁时,分布式对象监听器的 distributedObjectCreated
和distributedObjectDestroyed
方法会被调用。实现一个分布式对象监听器需要实现DistributedObjectListener
接口。
class SimpleListener : DistributedObjectListener {
override fun distributedObjectCreated(event: DistributedObjectEvent?) {
println("created ${event?.distributedObject?.toString()}")
}
override fun distributedObjectDestroyed(event: DistributedObjectEvent?) {
println("destroy ${event?.distributedObject?.toString()}")
}
}
注册监听器
创建了监听器类后可以使用addDistributedObjectListener
方法向Hazelcast注册监听器:
val instance = Hazelcast.newHazelcastInstance()
instance.addDistributedObjectListener(SimpleListener())
使用上面的方法配置监听器有一点不足:创建实例和注册监听之间的事件可能丢失。为了解决这个问题,Hazelcast支持在配置中配置监听器。
代码配置:
val config = Config()
config.addListenerConfig(ListenerConfig("xx.SimpleListener"))
val instance = Hazelcast.newHazelcastInstance(config)
声明式配置:
<hazelcast>
...
<listeners>
<listener>
xx.SimpleListener
</listener>
</listeners>
...
</hazelcast>
Spring配置:
<hz:listeners>
<hz:listener class-name="xx.SimpleListener"/>
<hz:listener implementation="DistributedObjectListener"/>
</hz:listeners>
8.1.3. 监听迁移事件
下列事件会触发迁移监听器接口方法:
migrationStarted
migrationFinished
replicaMigrationCompleted
replicaMigrationFailed
实现一个迁移监听器类需要实现 MigrationListener
接口:
class SimpleMigrationListener : MigrationListener {
override fun migrationFailed(p0: MigrationEvent?) {
println("migration failed ${p0?.status}")
}
override fun migrationStarted(p0: MigrationEvent?) {
println("migration started ${p0?.status}")
}
override fun migrationCompleted(p0: MigrationEvent?) {
println("migration completed ${p0?.status}")
}
}
注册监听器
可以使用 addMigrationListener
方法向Hazelcast注册监听器:
val instance = Hazelcast.newHazelcastInstance()
val partitionService = instance.partitionService
partitionService.addMigrationListener(SimpleMigrationListener())
使用上面的方法配置监听器有一点不足:创建实例和注册监听之间的事件可能丢失。为了解决这个问题,Hazelcast支持在配置中配置监听器。
代码配置:
val config = Config()
config.addListenerConfig(ListenerConfig("xx.SimpleMigrationListener"))
val instance = Hazelcast.newHazelcastInstance(config)
声明式配置:
<hazelcast>
...
<listeners>
<listener>
xx.SimpleMigrationListener
</listener>
</listeners>
...
</hazelcast>
Spring配置:
<hz:listeners>
<hz:listener class-name="xx.SimpleMigrationListener"/>
<hz:listener implementation="MigrationListener"/>
</hz:listeners>
8.1.4. 监听分区丢失事件
Hazelcast通过数据多副本提供容错能力。每个分区都有唯一的拥有者,根据配置每个分区会有不同数量的副本拥有者。但是,如果一些成员同时宕机可能导致数据丢失。分区丢失侦听器通过分区丢失了多少个副本信息来通知可能发生的数据丢失。每个分区丢失都会创建分区丢失事件。在检测到一个成员崩溃并将其从群集中移除之后,将执行分区丢失检测。 请注意,可能会因网络分区错误而触发错误的PartitionLostEvent
事件。
实现分区监听器
分区监听器需要实现PartitionLostListener
接口:
class SimplePartitionLostListener: PartitionLostListener {
override fun partitionLost(p0: PartitionLostEvent?) {
println("partition lost ${p0?.toString()}")
}
}
集群产生PartitionLostEvent
事件时,分区丢失监听器将会输出分区ID,丢失的副本索引以及检测到分区丢失的集群成员。
partition lost com.hazelcast.partition.PartitionLostEvent{partitionId=202, lostBackupCount=0, eventSource=[192.168.2.105]:5701}
注册监听器
代码配置1:
val config = Config()
config.addListenerConfig(ListenerConfig("xx.SimplePartitionLostListener"))
val instance = Hazelcast.newHazelcastInstance(config)
代码配置2:
val instance = Hazelcast.newHazelcastInstance()
val partitionService = instance.partitionService
partitionService.addPartitionLostListener(SimplePartitionLostListener())
和上面代码配置等价的声明式配置;
<hazelcast>
...
<listeners>
<listener>
xx.SimplePartitionLostListener
</listener>
</listeners>
...
</hazelcast>
8.1.5. 监听生命周期事件
生命周期监听器会收到以下事件的通知:
STARTING
STARTED
SHUTTING_DOWN
SHUTDOWN
MERGING
MERGED
CLIENT_CONNECTED
CLIENT_DISCONNECTED
实现生命周期监听器需要实现LifecycleListener
接口:
class SimpleLifecycleListener : LifecycleListener {
override fun stateChanged(p0: LifecycleEvent?) {
println("sate changed ${p0?.toString()}")
}
}
生命周期监听器是本地的,只能通知应用所在成员上发生的事件。
注册监听器
使用 addLifecycleListener
注册监听器:
val instance = Hazelcast.newHazelcastInstance()
instance.lifecycleService.addLifecycleListener(SimpleLifecycleListener())
使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。
代码配置1:
val config = Config()
config.addListenerConfig(ListenerConfig("xxx.SimpleLifecycleListener"))
val instance = Hazelcast.newHazelcastInstance(config)
声明式配置:
<hazelcast>
...
<listeners>
<listener>
xxx.SimpleLifecycleListener
</listener>
</listeners>
...
</hazelcast>
Spring配置:
<hz:listeners>
<hz:listener class-name="xxx.SimpleLifecycleListener"/>
<hz:listener implementation="LifecycleListener"/>
</hz:listeners>
8.1.6. 监听客户端
集群成员可以使用客户端监听器来感知客户端接入集群和离开集群。客户端的连接和断连只会触发客户连接的成员收到事件,集群的其他成员无法感知。
一个简单的客户端监听器:
class SimpleClientListener : ClientListener {
override fun clientDisconnected(p0: Client?) {
println("disconnected ${p0?.toString()}")
}
override fun clientConnected(p0: Client?) {
println("connected ${p0?.toString()}")
}
}
注册监听器
代码配置1:
val instance = Hazelcast.newHazelcastInstance()
instance.clientService.addClientListener(SimpleClientListener())
声明式配置:
<hazelcast>
...
<listeners>
<listener>
xxx.SimpleClientListener
</listener>
</listeners>
...
</hazelcast>
Spring配置:
<hz:listeners>
<hz:listener class-name="xxx.SimpleClientListener"/>
<hz:listener implementation="com.yourpackage.ExampleClientListener"/>
</hz:listeners>
8.2. 分布式对象事件
8.2.1. 监听Map事件
使用 MapListener
及其子接口可以监听map操作触发的事件。
捕获Map事件
如果一个类想捕获Map事件,需要实现目标事件对应的接口,比如entryAddedListener
和MapClearedListener
。
class SimpleMapListener : EntryAddedListener<String, String> {
override fun entryAdded(p0: EntryEvent<String, String>?) {
println("entry added key = ${p0?.key},value = ${p0?.value}")
}
}
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance()
val map = instance.getMap<String, String>("map")
map.addEntryListener(SimpleMapListener(), true)
}
创建另一个Hazelcast实例,并向map中添加几个数据严重监听器是否能够收到通知:
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance()
val map = instance.getMap<String, String>("map")
for (i in 1..3) {
map["key$i"] = "value$i"
}
}
监听器的输出:
entry added key = key1,value = value1
entry added key = key2,value = value2
entry added key = key3,value = value3
8.2.2. 监听Map 分区丢失
向Hazelcast注册一个MapPartitionLostListener
监听器可以监听map分区丢失事件。
class SimpleMapPartitionLostListener : MapPartitionLostListener {
override fun partitionLost(p0: MapPartitionLostEvent?) {
println("partition lost ${p0?.toString()}")
}
}
注册监听器
val instance = Hazelcast.newHazelcastInstance()
val map = instance.getMap<String, String>("map")
map.addPartitionLostListener(SimpleMapPartitionLostListener())
使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。
代码配置:
val config = Config()
config.getMapConfig("map").addMapPartitionLostListenerConfig(MapPartitionLostListenerConfig("xxx.SimpleMapPartitionLostListener"))
声明式配置:
<hazelcast>
...
<map name="map">
<entry-listeners>
<entry-listener include-value="false" local="false">
xxx.SimpleMapPartitionLostListener
</entry-listener>
</entry-listeners>
</map>
...
</hazelcast>
Spring配置:
<hz:map name="map">
<hz:entry-listeners>
<hz:entry-listener include-value="true"
class-name="xxx.SimpleMapPartitionLostListener"/>
<hz:entry-listener implementation="dummyEntryListener" local="true"/>
</hz:entry-listeners>
</hz:map>
Map 监听器属性
include-value
,事件是否包含value,默认true
。local
,是否只监听本地事件,默认值false
。
8.2.3. 监听 MultiMap 事件
如果要监听MultiMap 产生的事件,需要实现EntryListener
接口。
class SimpleEntryListener : EntryListener<String, String> {
override fun entryEvicted(p0: EntryEvent<String, String>?) {
println("evicted ${p0?.toString()}")
}
override fun entryUpdated(p0: EntryEvent<String, String>?) {
println("updated ${p0?.toString()}")
}
override fun mapCleared(p0: MapEvent?) {
println("cleared ${p0?.toString()}")
}
override fun entryAdded(p0: EntryEvent<String, String>?) {
println("added ${p0?.toString()}")
}
override fun entryRemoved(p0: EntryEvent<String, String>?) {
println("removed ${p0?.toString()}")
}
override fun mapEvicted(p0: MapEvent?) {
println("evicted ${p0?.toString()}")
}
}
注册监听器
使用addEntryListener
方法注册:
val instance = Hazelcast.newHazelcastInstance()
val map = instance.getMultiMap<String, String>("map")
map.addEntryListener(SimpleEntryListener(), true)
使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。
代码配置:
val config = Config()
val multiMapConfig = config.getMultiMapConfig("map")
multiMapConfig.addEntryListenerConfig(EntryListenerConfig("xxx.SimpleEntryListener", false, false))
val instance = Hazelcast.newHazelcastInstance(config)
声名式配置:
<hazelcast>
...
<multimap name="map">
<value-collection-type>SET</value-collection-type>
<entry-listeners>
<entry-listener include-value="false" local="false">
xxx.SimpleEntryListener
</entry-listener>
</entry-listeners>
</multimap>
...
</hazelcast>
Spring 配置:
<hz:multimap name="map" value-collection-type="SET">
<hz:entry-listeners>
<hz:entry-listener include-value="false"
class-name="xxx.SimpleEntryListener"/>
<hz:entry-listener implementation="EntryListener" local="false"/>
</hz:entry-listeners>
</hz:multimap>
MultiMap 监听器属性
和Map监听器属性一致。
8.2.4. 监听元素事件
IQueue
, ISet
和IList
接口使用元素监听器。实现一个元素监听器需要实现ItemListener
,添加和删除元素会触发事件。
下面是一个简单的元素监听器
class SimpleItemListener : ItemListener<String> {
override fun itemRemoved(p0: ItemEvent<String>?) {
println("item removed ${p0?.toString()}")
}
override fun itemAdded(p0: ItemEvent<String>?) {
println("item added ${p0?.toString()}")
}
}
注册监听器
创建完监听器后,可以使用addItemListener
注册监听器:
val instance = Hazelcast.newHazelcastInstance()
val set = instance.getSet<String>("set")
set.addItemListener(SimpleItemListener(), true)
使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。
代码配置:
setConfig.addItemListenerConfig(
new ItemListenerConfig( "xxx.SimpleItemListener", true ) );
声明式配置:
<hazelcast>
...
<set>
<item-listeners>
<item-listener include-value="true">
xxx.SimpleItemListener
</item-listener>
</item-listeners>
</set>
...
</hazelcast>
Spring配置:
<hz:set name="default" >
<hz:item-listeners>
<hz:item-listener include-value="true"
class-name="xxx.SimpleItemListener"/>
</hz:item-listeners>
</hz:set>
元素监听器属性
-
include-value
:事件是否包含value,默认为true
。 -
local
:是否只监听本地成员事件,默认值为false
。
8.2.5. 监听主题消息
ITopic
接口使用消息监听器用来在收到主题对应的消息时通知注册的监听器。实现一个消息监听器需要实现MessageListener
。
class SimpleMessageListener : MessageListener<String> {
override fun onMessage(p0: Message<String>?) {
println("get message ${p0?.toString()}")
}
}
注册监听器
创建完监听器后,可以使用addMessageListener
向Hazelcast注册监听器:
val instance = Hazelcast.newHazelcastInstance()
val topic = instance.getTopic<String>("topic")
topic.addMessageListener(SimpleMessageListener())
使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。
代码配置:
val config = Config()
config.getTopicConfig("topic").addMessageListenerConfig(ListenerConfig("xxx.SimpleMessageListener"))
声明式配置:
<hazelcast>
...
<topic name="default">
<message-listeners>
<message-listener>
xxx.SimpleMessageListener
</message-listener>
</message-listeners>
</topic>
...
</hazelcast>
Spring配置:
<hz:topic name="default">
<hz:message-listeners>
<hz:message-listener
class-name="xxx.SimpleMessageListener"/>
</hz:message-listeners>
</hz:topic>
8.3. 全局事件配置
-
hazelcast.event.queue.capacity
: 默认值1000000 -
hazelcast.event.queue.timeout.millis
: 默认值 250 -
hazelcast.event.thread.count
: 默认值 5
集群成员中的executor控制并调度接收到的事件,同时也负责保证事件的有序性。对于Hazelcast中的所有事件,对于给定的key,可以保证事件生成的顺序和事件发布的顺序一致。对于map和multimap来说,对于同一个key的操作顺序可以保证。对于list,set,topic和queue,事件的顺序和操作的顺序一致。
如果事件队列达到容量限制(hazelcast.event.queue.capacity
) ,最后一个事件无法在hazelcast.event.queue.timeout.millis
内插入事件队列,这些事件将会被丢弃并发出一个警告信息“EventQueue overload”。
为了实现事件的有序性,StripedExecutor
中的每一个线程负责处理事件的一部分。如果监听器执行的计算非常耗时,这有可能导致事件队列达到容量限制并丢失事件。对于map和multimap,可以将hazelcast.event.thread.count
配置为更高的值,以减少键冲突,因此,工作线程在StripedExecutor
中不会相互阻塞。对于list,set,topic和queue,必须把负载重的工作提交到其他线程中处理。为了保证事件的顺序,在其他线程中应该实现StripedExecutor
中同样的逻辑。