【Hazelcast系列十一】分布式事件

分布式事件

如果想感知某些事件,可以向Hazelcast实例注册监听器。监听器在集群范围内有效,向集群内的一个成员注册监听器,实际上是向集群内所有的成员注册了监听器。新加入成员产生的事件也会被发送给监听器。Hazelcast只生成注册了监听器的事件。如果没有监听器,也就不会有事件产生,为什么要去做没人关心的事情呢?如果在注册监听器时提供了谓词,事件在发送给监听器时首先要通过谓词,只有通过谓词的事件才能最终发送给监听器。

根据经验,不宜在监听器内实现过重的处理逻辑(会阻塞线程影响事件的处理)。如若真的需要复杂的处理逻辑,可以使用ExecutorService 异步执行。

注意: 事件并不是高可用的,在故障场景有丢失的风险。但是有一些变通的手段,比如配置容量等可以用来降低事件丢失的可能性。

Hazelcast 提供以下事件监听器。

集群事件:

  • Membership Listener
  • Distributed Object Listener
  • Migration Listener
  • Partition Lost Listener
  • Lifecycle Listener
  • Client Listener

分布式对象事件:

  • Entry Listener
  • Item Listener
  • Message Listener

JCache :

  • Cache Entry Listener
  • ICache Partition Lost Listener

Hazelcast 客户端:

  • Lifecycle Listener
  • Membership Listener
  • Distributed Object Listener

8.1. 集群事件

8.1.1. 监听成员事件

下面的事件会触发成员关系监听器的接口方法被调用:

  • memberAdded
  • memberRemoved
  • memberAttributeChanged

实现一个成员关系监听器需要实现 MembershipListener 接口。

class ClusterMembershipListener : MembershipListener {
    override fun memberRemoved(event: MembershipEvent?) {
        println(event?.member.toString())
    }

    override fun memberAdded(event: MembershipEvent?) {
        println(event?.member.toString())
    }

    override fun memberAttributeChanged(event: MemberAttributeEvent?) {
        println(event?.member.toString())
    }

}
注册监听器

实现监听器类后可以使用 addMembershipListener 方法向Hazelcast注册监听器:

val instance = Hazelcast.newHazelcastInstance();
instance.cluster.addMembershipListener(ClusterMembershipListener())

使用上面的方法配置监听器有一点不足:创建实例和注册监听之间的事件可能丢失。为了解决这个问题,Hazelcast支持在配置中配置监听器。

代码配置:

val config = Config()
config.addListenerConfig(ListenerConfig("io.github.ctlove0523.hazelcast.data.ClusterMembershipListener"))
val instance = Hazelcast.newHazelcastInstance(config)

声明式配置:

<hazelcast>
    ...
    <listeners>
        <listener>
            io.github.ctlove0523.hazelcast.data.ClusterMembershipListener
        </listener>
    </listeners>
    ...
</hazelcast>

Spring配置:

<hz:listeners>
    <hz:listener class-name="io.github.ctlove0523.hazelcast.data.ClusterMembershipListener"/>
    <hz:listener implementation="MembershipListener"/>
</hz:listeners>

8.1.2. 监听分布式对象事件

当集群内一个分布式对象创建和销毁时,分布式对象监听器的 distributedObjectCreateddistributedObjectDestroyed 方法会被调用。实现一个分布式对象监听器需要实现DistributedObjectListener接口。

class SimpleListener : DistributedObjectListener {
    override fun distributedObjectCreated(event: DistributedObjectEvent?) {
        println("created ${event?.distributedObject?.toString()}")
    }

    override fun distributedObjectDestroyed(event: DistributedObjectEvent?) {
        println("destroy ${event?.distributedObject?.toString()}")
    }

}
注册监听器

创建了监听器类后可以使用addDistributedObjectListener 方法向Hazelcast注册监听器:

val instance = Hazelcast.newHazelcastInstance()
instance.addDistributedObjectListener(SimpleListener())

使用上面的方法配置监听器有一点不足:创建实例和注册监听之间的事件可能丢失。为了解决这个问题,Hazelcast支持在配置中配置监听器。

代码配置:

val config = Config()
config.addListenerConfig(ListenerConfig("xx.SimpleListener"))
val instance = Hazelcast.newHazelcastInstance(config)

声明式配置:

<hazelcast>
    ...
    <listeners>
        <listener>
            xx.SimpleListener
        </listener>
    </listeners>
    ...
</hazelcast>

Spring配置:

<hz:listeners>
    <hz:listener class-name="xx.SimpleListener"/>
    <hz:listener implementation="DistributedObjectListener"/>
</hz:listeners>

8.1.3. 监听迁移事件

下列事件会触发迁移监听器接口方法:

  • migrationStarted
  • migrationFinished
  • replicaMigrationCompleted
  • replicaMigrationFailed

实现一个迁移监听器类需要实现 MigrationListener 接口:

class SimpleMigrationListener : MigrationListener {
    override fun migrationFailed(p0: MigrationEvent?) {
        println("migration failed ${p0?.status}")
    }

    override fun migrationStarted(p0: MigrationEvent?) {
        println("migration started ${p0?.status}")
    }

    override fun migrationCompleted(p0: MigrationEvent?) {
        println("migration completed ${p0?.status}")
    }

}
注册监听器

可以使用 addMigrationListener方法向Hazelcast注册监听器:

val instance = Hazelcast.newHazelcastInstance()
val partitionService = instance.partitionService
partitionService.addMigrationListener(SimpleMigrationListener())

使用上面的方法配置监听器有一点不足:创建实例和注册监听之间的事件可能丢失。为了解决这个问题,Hazelcast支持在配置中配置监听器。

代码配置:

val config = Config()
config.addListenerConfig(ListenerConfig("xx.SimpleMigrationListener"))
val instance = Hazelcast.newHazelcastInstance(config)

声明式配置:

<hazelcast>
    ...
    <listeners>
        <listener>
            xx.SimpleMigrationListener
        </listener>
    </listeners>
    ...
</hazelcast>

Spring配置:

<hz:listeners>
    <hz:listener class-name="xx.SimpleMigrationListener"/>
    <hz:listener implementation="MigrationListener"/>
</hz:listeners>

8.1.4. 监听分区丢失事件

Hazelcast通过数据多副本提供容错能力。每个分区都有唯一的拥有者,根据配置每个分区会有不同数量的副本拥有者。但是,如果一些成员同时宕机可能导致数据丢失。分区丢失侦听器通过分区丢失了多少个副本信息来通知可能发生的数据丢失。每个分区丢失都会创建分区丢失事件。在检测到一个成员崩溃并将其从群集中移除之后,将执行分区丢失检测。 请注意,可能会因网络分区错误而触发错误的PartitionLostEvent事件。

实现分区监听器

分区监听器需要实现PartitionLostListener 接口:

class SimplePartitionLostListener: PartitionLostListener {
    override fun partitionLost(p0: PartitionLostEvent?) {
        println("partition lost ${p0?.toString()}")
    }
}

集群产生PartitionLostEvent 事件时,分区丢失监听器将会输出分区ID,丢失的副本索引以及检测到分区丢失的集群成员。

partition lost com.hazelcast.partition.PartitionLostEvent{partitionId=202, lostBackupCount=0, eventSource=[192.168.2.105]:5701}
注册监听器

代码配置1:

val config = Config()
config.addListenerConfig(ListenerConfig("xx.SimplePartitionLostListener"))
val instance = Hazelcast.newHazelcastInstance(config)

代码配置2:

val instance = Hazelcast.newHazelcastInstance()
val partitionService = instance.partitionService
partitionService.addPartitionLostListener(SimplePartitionLostListener())

和上面代码配置等价的声明式配置;

<hazelcast>
    ...
    <listeners>
        <listener>
            xx.SimplePartitionLostListener
        </listener>
    </listeners>
    ...
</hazelcast>

8.1.5. 监听生命周期事件

生命周期监听器会收到以下事件的通知:

  • STARTING
  • STARTED
  • SHUTTING_DOWN
  • SHUTDOWN
  • MERGING
  • MERGED
  • CLIENT_CONNECTED
  • CLIENT_DISCONNECTED

实现生命周期监听器需要实现LifecycleListener 接口:

class SimpleLifecycleListener : LifecycleListener {
    override fun stateChanged(p0: LifecycleEvent?) {
        println("sate changed ${p0?.toString()}")
    }
}

生命周期监听器是本地的,只能通知应用所在成员上发生的事件。

注册监听器

使用 addLifecycleListener注册监听器:

val instance = Hazelcast.newHazelcastInstance()
instance.lifecycleService.addLifecycleListener(SimpleLifecycleListener())

使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。

代码配置1:

val config = Config()
config.addListenerConfig(ListenerConfig("xxx.SimpleLifecycleListener"))
val instance = Hazelcast.newHazelcastInstance(config)

声明式配置:

<hazelcast>
    ...
    <listeners>
        <listener>
            xxx.SimpleLifecycleListener
        </listener>
    </listeners>
    ...
</hazelcast>

Spring配置:

<hz:listeners>
    <hz:listener class-name="xxx.SimpleLifecycleListener"/>
    <hz:listener implementation="LifecycleListener"/>
</hz:listeners>

8.1.6. 监听客户端

集群成员可以使用客户端监听器来感知客户端接入集群和离开集群。客户端的连接和断连只会触发客户连接的成员收到事件,集群的其他成员无法感知。

一个简单的客户端监听器:

class SimpleClientListener : ClientListener {
    override fun clientDisconnected(p0: Client?) {
        println("disconnected ${p0?.toString()}")
    }

    override fun clientConnected(p0: Client?) {
        println("connected ${p0?.toString()}")
    }
}
注册监听器

代码配置1:

val instance = Hazelcast.newHazelcastInstance()
instance.clientService.addClientListener(SimpleClientListener())

声明式配置:

<hazelcast>
    ...
    <listeners>
        <listener>
            xxx.SimpleClientListener
        </listener>
    </listeners>
    ...
</hazelcast>

Spring配置:

<hz:listeners>
    <hz:listener class-name="xxx.SimpleClientListener"/>
    <hz:listener implementation="com.yourpackage.ExampleClientListener"/>
</hz:listeners>

8.2. 分布式对象事件

8.2.1. 监听Map事件

使用 MapListener 及其子接口可以监听map操作触发的事件。

捕获Map事件

如果一个类想捕获Map事件,需要实现目标事件对应的接口,比如entryAddedListenerMapClearedListener

class SimpleMapListener : EntryAddedListener<String, String> {
    override fun entryAdded(p0: EntryEvent<String, String>?) {
        println("entry added key = ${p0?.key},value = ${p0?.value}")
    }
}

fun main() = runBlocking<Unit> {
    val instance = Hazelcast.newHazelcastInstance()
    val map = instance.getMap<String, String>("map")
    map.addEntryListener(SimpleMapListener(), true)
}

创建另一个Hazelcast实例,并向map中添加几个数据严重监听器是否能够收到通知:

fun main() = runBlocking<Unit> {
    val instance = Hazelcast.newHazelcastInstance()
    val map = instance.getMap<String, String>("map")
    for (i in 1..3) {
        map["key$i"] = "value$i"
    }
}

监听器的输出:

entry added key = key1,value = value1
entry added key = key2,value = value2
entry added key = key3,value = value3

8.2.2. 监听Map 分区丢失

向Hazelcast注册一个MapPartitionLostListener监听器可以监听map分区丢失事件。

class SimpleMapPartitionLostListener : MapPartitionLostListener {
    override fun partitionLost(p0: MapPartitionLostEvent?) {
        println("partition lost ${p0?.toString()}")
    }
}
注册监听器
val instance = Hazelcast.newHazelcastInstance()
val map = instance.getMap<String, String>("map")
map.addPartitionLostListener(SimpleMapPartitionLostListener())

使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。

代码配置:

val config = Config()
config.getMapConfig("map").addMapPartitionLostListenerConfig(MapPartitionLostListenerConfig("xxx.SimpleMapPartitionLostListener"))

声明式配置:

<hazelcast>
    ...
    <map name="map">
        <entry-listeners>
            <entry-listener include-value="false" local="false">
                xxx.SimpleMapPartitionLostListener
            </entry-listener>
        </entry-listeners>
    </map>
    ...
</hazelcast>

Spring配置:

<hz:map name="map">
    <hz:entry-listeners>
        <hz:entry-listener include-value="true"
            class-name="xxx.SimpleMapPartitionLostListener"/>
        <hz:entry-listener implementation="dummyEntryListener" local="true"/>
    </hz:entry-listeners>
</hz:map>
Map 监听器属性
  • include-value ,事件是否包含value,默认true

  • local ,是否只监听本地事件,默认值 false

8.2.3. 监听 MultiMap 事件

如果要监听MultiMap 产生的事件,需要实现EntryListener接口。

class SimpleEntryListener : EntryListener<String, String> {
    override fun entryEvicted(p0: EntryEvent<String, String>?) {
        println("evicted ${p0?.toString()}")
    }

    override fun entryUpdated(p0: EntryEvent<String, String>?) {
        println("updated ${p0?.toString()}")
    }

    override fun mapCleared(p0: MapEvent?) {
        println("cleared ${p0?.toString()}")
    }

    override fun entryAdded(p0: EntryEvent<String, String>?) {
        println("added ${p0?.toString()}")
    }

    override fun entryRemoved(p0: EntryEvent<String, String>?) {
        println("removed ${p0?.toString()}")
    }

    override fun mapEvicted(p0: MapEvent?) {
        println("evicted ${p0?.toString()}")
    }

}
注册监听器

使用addEntryListener 方法注册:

val instance = Hazelcast.newHazelcastInstance()
val map = instance.getMultiMap<String, String>("map")
map.addEntryListener(SimpleEntryListener(), true)

使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。

代码配置:

val config = Config()
val multiMapConfig = config.getMultiMapConfig("map")
multiMapConfig.addEntryListenerConfig(EntryListenerConfig("xxx.SimpleEntryListener", false, false))
val instance = Hazelcast.newHazelcastInstance(config)

声名式配置:

<hazelcast>
    ...
    <multimap name="map">
        <value-collection-type>SET</value-collection-type>
        <entry-listeners>
            <entry-listener include-value="false" local="false">
                xxx.SimpleEntryListener
            </entry-listener>
        </entry-listeners>
    </multimap>
    ...
</hazelcast>

Spring 配置:

<hz:multimap name="map" value-collection-type="SET">
    <hz:entry-listeners>
        <hz:entry-listener include-value="false"
            class-name="xxx.SimpleEntryListener"/>
        <hz:entry-listener implementation="EntryListener" local="false"/>
    </hz:entry-listeners>
</hz:multimap>
MultiMap 监听器属性

和Map监听器属性一致。

8.2.4. 监听元素事件

IQueue, ISetIList 接口使用元素监听器。实现一个元素监听器需要实现ItemListener ,添加和删除元素会触发事件。

下面是一个简单的元素监听器

class SimpleItemListener : ItemListener<String> {
    override fun itemRemoved(p0: ItemEvent<String>?) {
        println("item removed ${p0?.toString()}")
    }

    override fun itemAdded(p0: ItemEvent<String>?) {
        println("item added ${p0?.toString()}")
    }
}
注册监听器

创建完监听器后,可以使用addItemListener 注册监听器:

val instance = Hazelcast.newHazelcastInstance()
val set = instance.getSet<String>("set")
set.addItemListener(SimpleItemListener(), true)

使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。

代码配置:

setConfig.addItemListenerConfig(
new ItemListenerConfig( "xxx.SimpleItemListener", true ) );

声明式配置:

<hazelcast>
    ...
    <set>
        <item-listeners>
            <item-listener include-value="true">
                xxx.SimpleItemListener
            </item-listener>
        </item-listeners>
    </set>
    ...
</hazelcast>

Spring配置:

<hz:set name="default" >
    <hz:item-listeners>
        <hz:item-listener include-value="true"
            class-name="xxx.SimpleItemListener"/>
    </hz:item-listeners>
</hz:set>
元素监听器属性
  • include-value:事件是否包含value,默认为true
  • local:是否只监听本地成员事件,默认值为false

8.2.5. 监听主题消息

ITopic 接口使用消息监听器用来在收到主题对应的消息时通知注册的监听器。实现一个消息监听器需要实现MessageListener

class SimpleMessageListener : MessageListener<String> {
    override fun onMessage(p0: Message<String>?) {
        println("get message ${p0?.toString()}")
    }
}
注册监听器

创建完监听器后,可以使用addMessageListener 向Hazelcast注册监听器:

val instance = Hazelcast.newHazelcastInstance()
val topic = instance.getTopic<String>("topic")
topic.addMessageListener(SimpleMessageListener())

使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。

代码配置:

val config = Config()
config.getTopicConfig("topic").addMessageListenerConfig(ListenerConfig("xxx.SimpleMessageListener"))

声明式配置:

<hazelcast>
    ...
    <topic name="default">
        <message-listeners>
            <message-listener>
                xxx.SimpleMessageListener
            </message-listener>
        </message-listeners>
    </topic>
    ...
</hazelcast>

Spring配置:

<hz:topic name="default">
    <hz:message-listeners>
        <hz:message-listener
            class-name="xxx.SimpleMessageListener"/>
    </hz:message-listeners>
</hz:topic>

8.3. 全局事件配置

  • hazelcast.event.queue.capacity: 默认值1000000
  • hazelcast.event.queue.timeout.millis: 默认值 250
  • hazelcast.event.thread.count: 默认值 5

集群成员中的executor控制并调度接收到的事件,同时也负责保证事件的有序性。对于Hazelcast中的所有事件,对于给定的key,可以保证事件生成的顺序和事件发布的顺序一致。对于map和multimap来说,对于同一个key的操作顺序可以保证。对于list,set,topic和queue,事件的顺序和操作的顺序一致。

如果事件队列达到容量限制(hazelcast.event.queue.capacity) ,最后一个事件无法在hazelcast.event.queue.timeout.millis内插入事件队列,这些事件将会被丢弃并发出一个警告信息“EventQueue overload”。

为了实现事件的有序性,StripedExecutor中的每一个线程负责处理事件的一部分。如果监听器执行的计算非常耗时,这有可能导致事件队列达到容量限制并丢失事件。对于map和multimap,可以将hazelcast.event.thread.count配置为更高的值,以减少键冲突,因此,工作线程在StripedExecutor中不会相互阻塞。对于list,set,topic和queue,必须把负载重的工作提交到其他线程中处理。为了保证事件的顺序,在其他线程中应该实现StripedExecutor中同样的逻辑。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容