说在前头的概念
MQ 是基于发布订阅模型的消息系统。 消息的订阅方订阅关注的 Topic,以获取并消费消息。 由于订阅方应用一般是分布式系统,以集群方式部署有多台机器。 因此 MQ 约定以下概念。
集群:MQ 约定使用相同 Consumer ID 的订阅者属于同一个集群。同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点。
集群消费:当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。
广播消费:当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次
ons 包
非官方包
https://github.com/XadillaX/aliyun-ons
官方包
https://github.com/ali-sdk/ali-ons
项目实践之多服务单 Consumer ID 实现广播消费模式
从官方的描述来说,集群消费模式下,多台服务器同一个消费者ID,只会有一个消费节点能接收到消息
官方的ons包是支持广播消费的,但是项目中(node) 用到的是非官方包,并没有看到广播消费的配置参数, 想到 redis的 pub/sub ,就想到或许能模拟成广播消费的模式
MQ 部分
我觉得看 aliyun-ons 这个就可以了
redis 部分
const _ = require('lodash');
const redis = require('redis');
const {createClient} = redis;
let sub_client;
const redis_const = {
CHANNEL_LIST: ['channel_name'],
CHANNEL_PREFIX: 'XXX:PubSub:',
};
const init_sub_connection = async () => {
sub_client = createClient({
host: 'localhost',
port: 9736,
});
sub_client.select(1);
};
const init_subscribe = async () => {
const channels = _.map(redis_const.CHANNEL_LIST, (channel) => {
return redis_const.CHANNEL_PREFIX + channel;
});
await sub_client.subscribe(channels);
sub_client.on('subscribe', function (channel, count) {
});
};
const on_subscribe_event = async () => {
sub_client.on('message', function(channel, message) {
channel = channel.replace(redis_const.CHANNEL_PREFIX, '');
console.log(message);
});
};
const start = async () => {
await init_sub_connection();
await init_subscribe();
await on_subscribe_event();
};
start();
redis publish 部分
省略一堆代码
......
await redis_pub_service.publish_message({
channel: 'channel_name',
message: JSON.stringify({ status }),
});
当集群的某个消费者接收到消息时,把消息的内容通过 redis 发布到某个频道, 集群中所有订阅到这个频道的,都会接收到消息, 从而实现了自己业务系统内的广播消费