主流消息中间件介绍

Kafka

Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格的要求,适合产生大量数据的互联网服务的数据收集业务。
Kafka高性能的读写主要是借力于操作系统底层的PageCache。

kafka集群模式

Kafka集群模式.png

Kafa关注与性能,对数据可靠性要求不是很高。

RocketMQ

阿里开源的,具有高吞吐量、高性能、适合大规模分布式系统的应用。目前在阿里集团被广泛引用于交易、充值、流计算、消息推送、日志流式处理。优点:能够保证顺序消费,提供了丰富的消息拉取和处理的模式,方便水平扩展,实时的消息订阅的机制,承载上亿级别消息的能力。RocketMQ借鉴了Kafka。数据存储关键的两个文件commit log,consume queue。使用Name server做集群的协调和管理,原因是阿里觉得Zookeeper的性能太低了,Name server的源码非常精简。RocketMQ的集群架构有很多种,比如说主从模式、双Master模式、双主双从模式、多主多从模式,RocketMQ刷盘策略也很多,比如同步双写,异步复制 。
RocketMQ的维护是一个痛点,需要非常专业的运维团队。

RabbitMQ

RabbitMQ是使用Erlang语言开发的,性能是非常好的,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,可以使用镜像队列,对性能和吞吐量的要求还在其次。


RabitMQ高可用负载均衡集群.png

第二章 RabbitMQ核心概念及AMQP协议

2.1 本章导航

为什么选择RabbitMQ?
RabbitMQ的高性能之道是如何做到的?
什么是AMQP高级协议?
AMQP核心概念是什么?
RabbitMQ整体架构模型是什么样子的?
RabbitMQ消息是如何流转的?
RabbitMQ消息生产与消费
RabbitMQ交换机详解
RabbitMQ队列、绑定、虚拟主机、消息

2.2 初识RabbitMQ

RabbitMQ是开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于uAMQP协议的。
为什么选择RabbitMQ?
开源,社区活跃,性能优越,稳定性保障;提供可靠性消息投递(confirm)、返回模式(return)。
与SpringAMQP完美的整合,API丰富。
集群模式丰富,表达式配置、HA模式,镜像队列模型。
保证数据不丢失的前提下做到高可靠性、可用性。

RabbitMQ高性能的原因
使用Erlang语言开发,这使得RabbitMQ在Broker之间的数据交互的性能是非常优秀的。Erlang语言的优点:Erlang有着和原生Socket一样的延迟。

2.3 什么是AMQP 高级消息队列协议

AMQP全称:Advanced Message Queuing Protocol
定义:具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向对象的中间件设计。
AMQP协议模型

AMQP协议模型.png

Virtual Host : 虚拟主机
Exchange:交换机
Message Queue:交换机

AMQP核心概念
Server:又称Broker,接受客户端的连接,实现AMQP实体服务。
Connection:连接,应用程序和Broker之间的网络连接。
Channel:网络信道,几乎所有的操作都在channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。
Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体的内容。
Virtual Host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue。
Exchange:交换机,接收消息,根据消息的路由键转发消息到绑定的队列。
Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key。
Routing Key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息。
Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者。

2.4 RabbitMQ的整体架构

RabbitMQ的整体架构.png

消息生产者投递消息到Exchange,消费者监听Message Queue。
一个Exchange可以绑定多个Queue。

2.5 RabbitMQ安装和使用

2.6 Exchange交换机

交换机类型:direct,fanout,headers,topic
Durability:是否需要持久化,true为持久化。
Auto Delete:当Exchange上绑定的队列都删除后,这个Exchange就会自动删除。
Internal:当前Exchange是否为RabbitMQ内部使用,默认是false。

2.7消息Message

本质上是有Properties和Playload组成。
常用属性delivery mode,headers(自定义属性)。
其他属性:content_type,content_encoding,priority
correlation_id(业务和时间字符串的拼接,做成唯一的id,做消息ack,路由,幂等啊),reply_to(消费失败返回到哪个队列),expiration(过期时间),message_id(消息id),timestamp,user_id,app_id,cluster_id等。

2.8 虚拟主机

虚拟地址,用于进行逻辑隔离,最上层的消息路由。
一个Virtual Host里面可以有若干个Exchange和Queue。
同一个Virtual Host里面不能有相同名称的Exchange或Queue。

3 RabbitMQ高级特性

消息如何保障100%的投递成功?
消息幂等性概念详解。
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
*Confirm确认消息,Return返回消息。
自定义消费者。
消息的Ack与重回队列
消息限流
TTL消息
死信队列

3.1 消息如何保障100%投递成功

什么是生成端的可靠性投递?

  • 保障消息成功的发送出去
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认回答
  • 完善的消息进行补偿机制
    BAT、TMD互联网大厂的解决方案:
  • 消息落库,对消息状态进行打标(持久化到数据库,适合并发不高的场景)。
生产端可靠性投递
  • 消息的延迟投递,做二次确认,回调检查。


    消息延迟投递-回调确认

消息延迟投递,回调确认的目的是为了在主业务流程消除消息的入库操作,提高系统的吞吐量。Step2和Step1发送的是相同的消息,并且同时发出,不同之处在于Step2投递的是延迟消息。Step3消费者消费完消息后,进入Step4-向一个对列发送消息消费成功的消息,CallbackService的Step5和Step4监听同一个队列,CallbackService监听到消费成功的消息后,将消息持久化到数据库。Step6监听到延迟消息后,去数据库比对该消息是否消费成功,如果消费成功就忽略;如果没有消费成功,就会向上游服务发送RPC通信,告诉延迟检查的消息没有找到,上游服务根据id去业务库中查找业务记录重新拼接消息进行再投递。

3.2 幂等性的概念

我们先看下数据库的乐观锁机制
比如我们执行一条更新库存的SQL语句:

UPDATE t_reps SET count = count - 1, VERSION = VERSION + 1 WHERE VERSION = 1

我们来分析减库存是如何能应对高并发场景的。更新库存之前要查出当前库存的版本号,更新库存的同时更新版本号(where中带着版本号去作为更新条件)。假如有a,b两个线程同时来更新库存,如果a,b查出的版本号都是1,那么如果a线程更新库存成功的同时版本号也会更新为2,这时候b线程就不能再做减库存的操作了。
幂等性的含义是:一个操作不管并行执行多少次,它的结果永远是相同的。

在海量订单产生的业务高峰期,如何去避免消息的重复消费问题呢?

消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了一条同样的消息多次。

业界主流的幂等性操作:

  • 唯一ID+指纹码机制,利用数据库主键去重。
    指纹码可能是一些业务规则,并不是系统生成的,它的目的就是为了保障这次操作绝对是唯一的。ID+指纹码拼接作为唯一的主键。
SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID+指纹码

先去查看唯一ID+指纹锁是不是唯一的,如果数据库中已经有了,表明消息已经消费过,就可以忽略了;如果没有,就要进行消费。
优点:实现简单。
坏处:高并发下有数据库写入的性能瓶颈。
解决方案:跟进ID进行分库分表进行算法路由。

利用Redis的原子性去实现
使用Redis进行幂等,需要考虑的问题。

  1. 第一:我们是否要进行数据库落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?

比如,过来了一条消息,我用Redis的原子性把它过滤掉了。如果没有过滤掉,要进行的操作有 (1)redis中set订单id(2)在数据库中存储订单信息,这时候的关键点就是:如何做到数据库和redis缓存之间的一致性,怎么做到同时成功,同时失败。

  1. 第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
    放在缓存之中就能100%的成功吗?

3.3 Confirm确认消息

理解Confirm消息确认机制:

  • 消息确认,指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
  • 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障!
    如何实现Confirm确认消息?
  • 第一步:在channel上开启确认模式:channel.confirmSelect()
  • 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!什么时候会导致no ack呢?比如磁盘写满了、比如RabbitMQ出现了异常、queue的容量到达上限了。如果网络发生抖动,就有可能ack和no ack都没有处理到,这时候就要利用分布式定时任务去重发消息了。

3.4 Return消息机制

  • Return Listener用于处理一些不可路由的消息!
  • 我们的消息生产者,通过制定Exchange和RoutingKey,把消息发送到某个队列中去,然后我们的消费者监听队列,进行消费处理!
  • 但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由Key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener!
  • 在基础API中有一个关键的配置项:Mandatory如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理;如果为false,那么broker端会自动删除该消息,默认值是false!
  • addReturnListener

3.5 死信队列

死信队列:DLX,Dead-Letter-Exchange
利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。

*消息变成死信的几种情况

  • 消息被拒绝(basic.reject/basic.nack)并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

死信队列详细描述

  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
  • 可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

死信队列设置

  • 首先需要设置死信队列的exchange和queue,然后进行绑定:
    Exchange: dlx.exchange
    Queue:dlx.queue
    RoutingKey:#
    只要有消息到达了这个队列,都能路由到队列。
  • 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put("x-dead-letter-exchange", "dlx.exchage")。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容

  • ActiveMQ ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线,并且它是一个完全支持JMS...
    若兮缘阅读 10,227评论 0 10
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,860评论 2 11
  • 有很多人问过我这么一类问题:RabbitMQ如何确保消息可靠?很多时候,笔者的回答都是:说来话长的事情何来长话短说...
    Java机械师阅读 486评论 0 0
  • 利用RabbitMQ集群横向扩展能力,均衡流量压力,让消息集群的秒级服务能力达到百万,Google曾做过此类实验;...
    有货技术阅读 3,442评论 0 1
  • 五月一曰劳动节终于来到了!这一天我期盼了许久,在今年的小长假中,每一天我都过的充实,快乐!每一天我都能看到...
    小羽_235c阅读 169评论 0 0