消息中间件

1、RabbitMQ 参考

1.1 原理
image.png

  • vhost(虚拟主机):每个虚拟主机其实都是mini版的RabbitMQ,拥有自己的队列,交换器和绑定,拥有自己的权限机制;
  • Channel(信道):消息推送使用的通道,信道是生产消费者与rabbit通信的渠道,生产者publish或是消费者subscribe一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,什么意思呢?就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID ,保证了信道私有性,对应上唯一的线程使用。
  • Exchange(交换机):Exchange 用于转发消息,但是它不会做存储,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息;
  • RoutingKey(路由键):用于把生成者的数据分配到交换器上;
  • BindingKey(绑定键):用于把交换器的消息绑定到队列上;
  • Queue(队列):用于存储生产者的消息;
    总结:信道才是rabbit通信本质,生产者和消费者都是通过信道完成消息生产消费的;交换器本质是一张路由查询表(名称和队列id,类似于hash表),这是一个虚拟出来的东西,并不存在真实的交换器。
    消息的生命周期:生产者生产消息A 交由信道,信道通过消息(消息由载体和标签)的标签(路由键)放到交换器发送到队列上(其实就是查询匹配,一旦匹配到了规则,信道就直接和队列产生连接,然后将消息发送过去)

1.2 RabbitMQ交换机的种类和队列

  • Work模式

    work

    一个生产者、2个消费者。一个消息只能被一个消费者获取。
    轮询分发 :在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
    公平分发 :关闭自动应答,改为手动应答,basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。两种分发对比

  • 路由模式

    路由模式

Direct: 1:1类似完全匹配,direct 类型的行为是"先匹配, 再投送",即在绑定时设定一个routing_key, 消息的routing_key匹配时, 才会被交换器投送到绑定的队列中去;如上图,当生产者发送error路由键的消息时都能收到消息,当发送info路由键消息时只有C2能收到。

direct

  • 主题模式(通配符模式)
    image.png

    image.png

Topic: N:1,按规则转发消息(最灵活),多个交换器可以路由消息到同一个队列。根据模糊匹配,比如一个队列的routing key 为*.test ,那么凡是到达交换器的消息中的routing key 后缀.test都被路由到这个队列上;

image.png

同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

  • 订阅模式
    订阅模式

Fanout:1:N,:实际上就是广播,发送到fanout交换器的消息,会被转发给所有和这个交换器绑定的队列;

image.png

同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

  • Headers:设置header attribute参数类型的交换机。

2、ActiveMQ工作模式:
image.png

3、kafka 原文

3.1 kafka发送消息流程

image.png

图中有两个topic,topic 0有两个partition,topic 1有一个partition,三副本备份。

3.1.1 生产消息

image.png

创建一条记录,记录中一个要指定对应的topic和value,key和partition可选。 先序列化,然后按照topic和partition,放进对应的发送队列中。kafka produce都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。
如果partition没填,那么情况会是这样的:

  • key有填,按照key进行哈希,相同key去一个partition。(如果扩展了partition的数量那么就不能保证了)
  • key没填,round-robin来选partition

3.1.2 消费

订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。
一个消费组消费partition,需要保存offset记录消费到哪,保存在一个名叫__consumeroffsets topic的topic中。写进消息的key由groupid、topic、partition组成,value是偏移量offset。总是保留最新的key,其余删掉。一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。

3.2 partition

partition

partion可以看作一个有序的队列,里面的数据是储存在硬盘中的,追加式的。partition的作用就是提供分布式的扩展,一个topic可以有许多partions,多个partition可以并行处理数据,所以可以处理相当量的数据。只有partition的leader才会进行读写操作,folower仅进行复制,客户端是感知不到的。

当存在多副本的情况下,会尽量把多个副本,分配到不同的broker上。kafka会为partition选出一个leader,之后所有该partition的请求,实际操作的都是leader,然后再同步到其他的follower。当一个broker歇菜后,所有leader在该broker上的partition都会重新选举,选出一个leader。

partition的分配:
a. 将所有Broker(假设共n个Broker)和待分配的Partition排序
b. 将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)
c. 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

3.2 kafka分区与消费者的关系

生产者发送消息到主题,消费者订阅主题(以消费者组的名义订阅),而主题下是分区,消息是存储在分区中的,所以事实上生产者发送消息到分区,消费者则从分区读取消息。
在server.properties配置文件中可以指定一个全局的分区数设置,这是对每个主题下的分区数的默认设置,默认是1。每个主题也可以设置自己的分区数量。

生产者将消息投递到分区的规律,默认的分区策略:

  • 如果在发消息的时候指定了分区,则消息投递到指定的分区
  • 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
  • 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区

3.3 kafka工作方式

Consumer Group:这是kafka用来实现一个topic消息的广播和单播的手段,一个topic可以有多个group,topic的消息会发送给所有的group,但每个group只会把消息发给该group中的一个consumer。

  • 如果要实现消息广播,每个consumer需要有一个独立的group;
  • 如果要实现单播只要所有的consumer在同一个group;

3.4 Kafka的零拷贝技术 原文

image.png

image.png

如果有10个消费者,传统方式下,数据复制次数为4*10=40次,而使用“零拷贝技术”只需要1+10=11次,一次为从磁盘复制到页面缓存,10次表示10个消费者各自读取一次页面缓存。

3.5 kafka速度为什么这么快 参考

它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。

  • 写入数据:
    Kafka会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度Kafka采用了两个技术, 顺序写入 和 MMFile
    因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。
    Memory Mapped Files(后面简称mmap)也被翻译成 内存映射文件 ,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
  • 读取数据
    基于sendfile实现Zero Copy
    批量压缩,如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩

4、使用消息队列的优缺点

优点:

  • 异步处理



  • 应用解耦




    总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。

  • 流量削锋




    缺点:

  • 系统可用性降低
    系统引入的外部依赖越多,越容易挂掉。

  • 系统复杂度提高
    引用消息队列后需要保证消息重复消费,消息丢失,消息传递的顺序性等。

  • 一致性问题
    BCD 三个系统,BD 两个系统写库成功了,结果 C 系统写库失败了,数据就不一致了。

5、Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

6、如何保证消息队列的高可用?

6.1RabbitMQ 的高可用性

RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

  • 普通集群模式(无高可用性)
    创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。
  • 镜像集群模式(高可用性)
    创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。坏处在于,第一,性能开销太大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重;第二,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。

6.2Kafka 的高可用性

7、如何保证消息不被重复消费(如何保证消息的幂等性)?

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
解决办法:参考

Rabbitmq:
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。

kafka:
每个消息写入partition后会有一个offest,消费者消费数据以后每隔一段时间会把自己消费的offest提交,但如果没提交时消费者宕机,重启后会发生重复消费就无法保证幂等性。要保证消息的幂等性需结合具体的业务,比如插数据库先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。

8、如何保证消息不丢失

Rabbit消息丢失问题

  • 生产者丢失数据
    开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit
    开启 confirm 模式,每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个** nack 接口**,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
    事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息。
  • RabbitMQ丢失数据
    开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失。
    设置持久化有两个步骤:
    创建 queue 的时候将其设置为持久化,这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
    发送消息的时候将消息的 deliveryMode 设置为 2
    持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。
  • 消费端弄丢了数据
    关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把
    image.png

kafka消息丢失问题

  • 生产者丢失数据
    kafka的消息发送机制分为同步和异步机制。可以通过producer.type属性进行配置。
    同步模式的时候,有三种状态来保证消息的安全生产。可以通过配置request.required.acks属性。0—表示不进行消息接收是否成功的确认;1—表示当Leader接收成功时确认;-1—表示Leader和Follower都接收成功时确认;同步模式下只需要将确认机制设置为-1,让消息写入leader和所有的副本,就可以保证消息安全生产。
    异步模式的时候,生产者会在本地缓冲消息,并适时批量发送。当缓冲区满了,acks=0的时候,不需要进行消息接受是否成功的确认,所以会自动清空缓冲池里的消息。则需要在配置文件中,将阻塞超时的时间设置为不限制。这样生产端会一直阻塞。可以保证数据不丢失。我们需要设置block.on.buffer.full = true。 这样producer将一直等待缓冲区直至其变为可用。缓冲区满了就阻塞。
  • 保证消费者消息不丢失
    kafka的consumer模式是自动提交位移的。我们只需要在代码逻辑中保证位移提交前消息被处理就行。我们可以关闭自动提交位移,设置enable.auto.commit为false。自己手动处理消息后提交位移。

9、如何保证消息的顺序性?

消息错乱的场景:

  • RabbitMQ:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。
  • Kafka:一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
    消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。

解决方案:

  • Rabbitmq:拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
  • kafka:
    一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
    写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue,然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容