Hazelcast支持常用的publish/subscribe消息模型。发布和订阅都是集群级操作,一个member订阅主题,其实是订阅了集群内所有member发布到主题的消息,即使member是在订阅之后加入集群。
发布/订阅主题
发布消息:
fun main() = runBlocking {
val hz: HazelcastInstance = Hazelcast.newHazelcastInstance()
val topic = hz.getTopic<String>("topic")
topic.publish("hello")
}
订阅消息:
fun main() = runBlocking<Unit> {
val hz: HazelcastInstance = Hazelcast.newHazelcastInstance()
val topic = hz.getTopic<String>("topic")
topic.addMessageListener(MessageListener {
println(it.messageObject)
})
}
统计信息
主题两个统计量:发布的消息数和接收的消息数(member启动之后),只反映本节点的统计信息,非全局统计信息。
fun main() = runBlocking {
val hz: HazelcastInstance = Hazelcast.newHazelcastInstance()
val topic = hz.getTopic<String>("topic")
topic.publish("hello hazelcast")
topic.addMessageListener(MessageListener {
println(it.messageObject)
})
println(topic.localTopicStats.publishOperationCount)
println(topic.localTopicStats.receiveOperationCount)
}
注意:统计数据没有备份,如果member下线,统计数据会丢失。
理解主题的几个行为
集群的所有成员都有集群内订阅的列表。当一个新成员订阅一个主题时,该成员会向集群所有的成员发送该订阅消息,如果一个新成员加入集群,新成员也会收到目前集群内所有的订阅信息。主题的行为受参数 globalOrderEnabled
的影响。
消息消费的顺序和发布顺序一致
如果 globalOrderEnabled
设置为false,消息不会被排序,监听器将会按照消息发布的顺序处理消息。下面举一个简单的例子进行说明。假设集群有三个成员分别是member1,member2和member3,其中member1和member2订阅了主题message
,member1依次向主题message
发布消息a1,a2,member3依次向主题message
发布消息 c1,c2。member1和member2接收到的消息顺序可能是下面的顺序:
member1 → c1
, a1
, a2
, c2
member2 → c1
, c2
, a1
, a2
只能保证每个member发布的消息按顺序处理,不能保证不同member消息之间的处理顺序。
所有成员以相同的顺序处理消息
如果globalOrderEnabled
设置为true,监听同一个主题的所有member得到的消息的顺序是一致。上面的例子member1和member2接收消息的顺序将是相同的。
member1 → a1
, c1
, a2
, c2
member2 → a1
, c1
, a2
, c2
保证生产和发布消息的顺序一致
在两种场景下,StripeExecutor
负责消息的分发和接收。集群所有的事件的生成顺序和发布顺序的一致都是由StripeExecutor
保证。StripeExecutor
中线程的数量由参数hazelcast.event.thread.count
配置,默认线程数为5。事件源和线程的映射关系为:hash(source's name) % 5。多个事件源可能共享一个线程,因此为了不阻塞其他消息,监听器中不能有太重的处理逻辑,如果可以提交到其他线程异步处理。
配置主题
对于一个主题可以配置:名字,是否收集统计信息,全局有序性和消息监听器。其中有两项参数拥有默认值:
-
global-ordering
= false,不保证全局有序。 -
statistics
=true, 收集主题统计信息。
下面是使用xml对主题进行配置的样例。
<hazelcast>
...
<topic name="topic">
<global-ordering-enabled>true</global-ordering-enabled>
<statistics-enabled>true</statistics-enabled>
<message-listeners>
<message-listener>MessageListenerImpl</message-listener>
</message-listeners>
</topic>
...
</hazelcast>
主题配置支持一下元素:
-
statistics-enabled
: 是否收集统计信息,默认为true。 -
global-ordering-enabled
: 是否保证全局有序,默认值false。 -
message-listeners
: 主题监听器。
除去上面的配置下面的系统熟悉也和主题相关,但是不是主题所特有的配置:
-
hazelcast.event.queue.capacity
默认值1,000,000 -
hazelcast.event.queue.timeout.millis
默认值 250 -
hazelcast.event.thread.count
默认值 5