微服务异步架构——MQ之RocketMQ

“我们大家都知道把一个微服务架构变成一个异步架构只需要加一个MQ,现在市面上有很多MQ的开源框架。到底选择哪一个MQ的开源框架才合适呢 ”

一、什么是MQ?MQ的原理是什么?

MQ就是消息队列,是Message Queue的缩写。消息队列是一种通信方式。消息的本质就是一种数据结构。因为MQ把项目中的消息集中式的处理和存储,所以MQ主要有解耦,并发,和削峰的功能。

1.1 解耦

MQ的消息生产者和消费者互相不关心对方是否存在,通过MQ这个中间件的存在,使整个系统达到解耦的作用。

如果服务之间用RPC通信,当一个服务跟几百个服务通信时,如果那个服务的通信接口改变,那么几百个服务的通信接口都的跟着变动,这是非常头疼的一件事。

但是采用MQ之后,不管是生产者或者消费者都可以单独改变自己。他们的改变不会影响到别的服务。从而达到解耦的目的。为什么要解耦呢?说白了就是方便,减少不必要的工作量。

1.2 并发

MQ有生产者集群和消费者集群,所以客户端是亿级用户时,他们都是并行的。从而大大提升响应速度。

1.3 削峰

因为MQ能存储的消息量很大,所以他可以把大量的消息请求先存下了,然后再并发的方式慢慢处理。

如果采用RPC通信,每一次请求用调用RPC接口,当请求量巨大的时候,因为RPC的请求是很耗资源的,所以巨大的请求一定会压垮服务器。

削峰的目的是用户体验变好,并且使整个系统稳定。能承受大量请求消息。

二、现在市面上有什么MQ

重点介绍RocketMQ

现在市面上的MQ有很多,主要有RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ,Kafka等等,这些都是开源的MQ产品。以前很多人推荐使用RabbitMQ,他也是非常好用的MQ产品,这里不做过多的介绍。Kafka也是高吞吐量的老大,我们这里也不介绍。

我们重点介绍一下RocketMQ,RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache软件基金会,并于并于2017年9月25日成为 Apache 的顶级项目。

作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。

功能概览图

可以看见RocketMQ支持定时和延时消息,这是RabbitMQ所没有的能力。

RocketMQ的物理结构

从这里可以看出,RocketMQ涉及到四大集群,producer,Name Server,Consumer,Broker。

2.1 Producer集群:

是生产者集群,负责产生消息,向消费者发送由业务应用程序系统生成的消息,RocketMQ提供三种方式发送消息:同步,异步,单向。

2.1.1 普通消息

2.1.1.1 同步原理图

同步消息关键代码

try {
    SendResult sendResult =
    producer.send(msg);
    // 同步发送消息,
    只要不抛异常就是成功 if
    (sendResult != null) {
        System.out.println
        (new Date() + " Send mq message success.
Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
    }
    catch (Exception e) {
        System.out.println
        (new Date() + " Send mq message failed.
Topic is:" + msg.getTopic());
        e.printStackTrace();
    }
}

2.1.1.2 异步原理图

异步消息关键代码

producer.sendAsync(msg, new SendCallback()
{
    @Overridepublic void onSuccess
    (final SendResult sendResult)
    {
        // 消费发送成功 System.out.println
        ("send message success. topic=" +
        sendResult.getTopic() + ", msgId="
        + sendResult.getMessageId());
    }
    @Overridepublic void onException
    (OnExceptionContext context)
    {
        System.out.println("send message failed.
topic=" + context.getTopic() + ",
msgId=" + context.getMessageId());
    }
}
);

2.1.1.3 单向(Oneway)发送原理图

单向只发送,不等待返回,所以速度最快,一般在微秒级,但可能丢失

单向(Oneway)发送消息关键代码

producer.sendOneway(msg);

2.1.2 定时消息和延时消息

发送定时消息关键代码

try {
    // 定时消息,单位毫秒(ms),
    在指定时间戳(当前时间之后)进行投递,
    例如 2016-03-07 16:21:00 投递。
    如果被设置成当前时间戳之前的某个时刻,
    消息将立刻投递给消费者。 long timeStamp
    = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    .parse("2016-03-07 16:21:00").getTime();
    msg.setStartDeliverTime(timeStamp);
    // 发送消息,只要不抛异常就是成功
    SendResult sendResult = producer.send(msg);
    System.out.println
    ("MessageId:"+sendResult.getMessageId());
}
catch (Exception e) {
    // 消息发送失败,
    需要进行重试处理,可重新发送这条消息
    或持久化这条数据进行补偿处理
    System.out.println(new Date()
    + " Send mq message failed.
Topic is:" + msg.getTopic());
    e.printStackTrace();
}

发送延时消息关键代码

try {
    // 延时消息,单位毫秒(ms),
    在指定延迟时间(当前时间之后)进行投递,
    例如消息在 3 秒后投递 long delayTime
    = System.currentTimeMillis() + 3000;
    // 设置消息需要被投递的时间
    msg.setStartDeliverTime(delayTime);
    SendResult sendResult = producer.send(msg);
    // 同步发送消息,只要不抛异常就是成功
    if (sendResult != null)
    {
        System.out.println(new Date()
        + " Send mq message success.
Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
    }
}
catch (Exception e) {
    // 消息发送失败,
    需要进行重试处理,可重新发送这条消息
    或持久化这条数据进行补偿处理
    System.out.println(new Date()
    + " Send mq message failed.
Topic is:" + msg.getTopic());
    e.printStackTrace();
}

2.2 注意事项

①. 定时和延时消息的 msg.setStartDeliverTime 参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。

②. 定时和延时消息的 msg.setStartDeliverTime 参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败。

③. StartDeliverTime 是服务端开始向消费端投递的时间。 如果消费者当前有消息堆积,那么定时和延时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。

④. 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差。

⑤. 设置定时和延时消息的投递时间后,依然受 3 天的消息保存时长限制。例如,设置定时消息 5 天后才能被消费,如果第 5 天后一直没被消费,那么这条消息将在第8天被删除。

⑥. 除 Java 语言支持延时消息外,其他语言都不支持延时消息。

发布消息原理图

三、事务消息

RocketMQ提供类似X/Open XA的分布式事务功能来确保业务发送方和MQ消息的最终一致性,其本质是通过半消息的方式把分布式事务放在MQ端来处理。

原理图

其中:

①. 发送方向消息队列 RocketMQ 服务端发送消息。

②. 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。

③. 发送方开始执行本地事务逻辑。

④. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。

⑤. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

⑥. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

⑦. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作。

3.1 RocketMQ的半消息机制的注意事项是

①. 根据第六步可以看出他要求发送方提供业务回查接口。

②. 不能保证发送方的消息幂等,在ack没有返回的情况下,可能存在重复消息

③. 消费方要做幂等处理。

3.2 核心代码

final BusinessService businessService = new BusinessService(); // 本地业务

TransactionProducer producer
= ONSFactory.createTransactionProducer
(properties,new LocalTransactionCheckerImpl());
producer.start();
Message msg = new Message
("Topic", "TagA", "Hello MQ transaction===".
getBytes());
try {
    SendResult sendResult
    = producer.send(msg, new LocalTransactionExecuter()
    {
        @Override public TransactionStatus execute
        (Message msg, Object arg) {
            // 消息 ID
            (有可能消息体一样,但消息 ID 不一样,
            当前消息 ID 在控制台无法查询)
            String msgId = msg.getMsgID();
            // 消息体内容进行 crc32,也可以使用其它的如
            MD5 long crc32Id = HashUtil.crc32Code
            (msg.getBody());
            // 消息 ID 和 crc32id
            主要是用来防止消息重复 // 如果业务本身是幂等的,
            可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等
            // 如果要求消息绝对不重复,推荐做法是
            对消息体 body 使用 crc32 或 MD5 来防止重复消息
            Object businessServiceArgs = new Object();
            TransactionStatus transactionStatus
            =TransactionStatus.Unknow;
            try {
                Boolean isCommit =
                businessService.execbusinessService
                (businessServiceArgs);
                if (isCommit)
                {
                    // 本地事务成功则提交消息 transactionStatus = TransactionStatus.CommitTransaction; } else {
                        // 本地事务失败则回滚消息
                        transactionStatus = TransactionStatus.
                        RollbackTransaction;
                    }
                }
                catch (Exception e)
                {
                    log.error("Message Id:{}", msgId, e);
                }
                System.out.println(msg.getMsgID());
                log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
                return transactionStatus;
            }
        }
        , null);
    }
    catch (Exception e) {
        // 消息发送失败,
        需要进行重试处理,可重新发送这条消息或
        持久化这条数据进行补偿处理 System.out.println
        (new Date() + " Send mq message failed.
Topic is:" + msg.getTopic());
        e.printStackTrace();
    }

所有消息发布原理图

producer完全无状态,可以集群部署。

3.3 Name Server集群

NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步,NameServer很像注册中心的功能。

听说阿里之前的NameServer 是用ZooKeeper做的,可能因为Zookeeper不能满足大规模并发的要求,所以之后NameServer 是阿里自研的。

NameServer其实就是一个路由表,他管理Producer和Comsumer之间的发现和注册。

3.4 Broker集群

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slaver,但是一个Slaver只能对应一个Master,Master与Slaver的对应关系通过指定相同的BrokerName。

不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer。

3.5 Consumer集群

订阅方式

消息队列 RocketMQ 支持以下两种订阅方式:

集群订阅:同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。 例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。

// 集群订阅方式设置(不设置的情况下,
默认为集群订阅方式)properties.put
(PropertyKeyConst.MessageModel,
PropertyValueConst.CLUSTERING);

广播订阅:同一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。 例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。

// 广播订阅方式设置properties.put
(PropertyKeyConst.MessageModel,
PropertyValueConst.BR
OADCASTING);

订阅消息关键代码:

Consumer consumer = ONSFactory.create
Consumer(properties);
consumer.subscribe
("TopicTestMQ", "TagA||TagB", **new**
MessageListener() {
    //订阅多个
    Tag public Action consume(Message message,
    ConsumeContext context) {
        System.out.println
        ("Receive: " + message);
        return Action.
        CommitMessage;
    }
}
);
//订阅另外一个 Topic
consumer.subscribe("TopicTestMQ-Other",
"*", **new** MessageListener()
{
    //订阅全部 Tag public Action consume
    (Message message, ConsumeContext context)
    {
        System.out.println("Receive: " + message);
        return Action.CommitMessage;
    }
}
);
consumer.start();

注意事项:

消费端要做幂等处理,所有MQ基本上都不会做幂等处理,需要业务端处理,原因是如果在MQ端做幂等处理会带来MQ的复杂度,而且严重影响MQ的性能。

消息收发模型

主子账号创建

创建主子账号的原因是权限问题。下面是主账号创建流程图

子账号流程图

四、MQ是微服务架构

非常重要的部分

MQ的诞生把原来的同步架构思维转变到异步架构思维提供一种方法,为大规模,高并发的业务场景的稳定性实现提供了很好的解决思路。

Martin Fowler强调:分布式调用的第一原则就是不要分布式。这句话看似颇具哲理,然而就企业应用系统而言,只要整个系统在不停地演化,并有多个子系统共同存在时,这条原则就会被迫打破。

Martin Fowler提出的这条原则,一方面是希望设计者能够审慎地对待分布式调用,另一方面却也是分布式系统自身存在的缺陷所致。

所以微服务并不是万能药,适合的架构才是最好的架构。

# 链接 Java程序员福利"常用资料分享"

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容