1、RabbitMQ 参考
1.1 原理
- vhost(虚拟主机):每个虚拟主机其实都是mini版的RabbitMQ,拥有自己的队列,交换器和绑定,拥有自己的权限机制;
- Channel(信道):消息推送使用的通道,信道是生产消费者与rabbit通信的渠道,生产者publish或是消费者subscribe一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,什么意思呢?就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID ,保证了信道私有性,对应上唯一的线程使用。
- Exchange(交换机):Exchange 用于转发消息,但是它不会做存储,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息;
- RoutingKey(路由键):用于把生成者的数据分配到交换器上;
- BindingKey(绑定键):用于把交换器的消息绑定到队列上;
-
Queue(队列):用于存储生产者的消息;
总结:信道才是rabbit通信本质,生产者和消费者都是通过信道完成消息生产消费的;交换器本质是一张路由查询表(名称和队列id,类似于hash表),这是一个虚拟出来的东西,并不存在真实的交换器。
消息的生命周期:生产者生产消息A 交由信道,信道通过消息(消息由载体和标签)的标签(路由键)放到交换器发送到队列上(其实就是查询匹配,一旦匹配到了规则,信道就直接和队列产生连接,然后将消息发送过去)
1.2 RabbitMQ交换机的种类和队列
-
Work模式
一个生产者、2个消费者。一个消息只能被一个消费者获取。
轮询分发 :在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
公平分发 :关闭自动应答,改为手动应答,basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。两种分发对比 -
路由模式
Direct: 1:1类似完全匹配,direct 类型的行为是"先匹配, 再投送",即在绑定时设定一个routing_key, 消息的routing_key匹配时, 才会被交换器投送到绑定的队列中去;如上图,当生产者发送error路由键的消息时都能收到消息,当发送info路由键消息时只有C2能收到。
-
主题模式(通配符模式)
Topic: N:1,按规则转发消息(最灵活),多个交换器可以路由消息到同一个队列。根据模糊匹配,比如一个队列的routing key 为*.test ,那么凡是到达交换器的消息中的routing key 后缀.test都被路由到这个队列上;
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。
-
订阅模式
Fanout:1:N,:实际上就是广播,发送到fanout交换器的消息,会被转发给所有和这个交换器绑定的队列;
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。
- Headers:设置header attribute参数类型的交换机。
2、ActiveMQ工作模式:
3、kafka 原文
3.1 kafka发送消息流程
图中有两个topic,topic 0有两个partition,topic 1有一个partition,三副本备份。
3.1.1 生产消息
创建一条记录,记录中一个要指定对应的topic和value,key和partition可选。 先序列化,然后按照topic和partition,放进对应的发送队列中。kafka produce都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。
如果partition没填,那么情况会是这样的:
- key有填,按照key进行哈希,相同key去一个partition。(如果扩展了partition的数量那么就不能保证了)
- key没填,round-robin来选partition
3.1.2 消费
订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。
一个消费组消费partition,需要保存offset记录消费到哪,保存在一个名叫__consumeroffsets topic的topic中。写进消息的key由groupid、topic、partition组成,value是偏移量offset。总是保留最新的key,其余删掉。一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。
3.2 partition
partion可以看作一个有序的队列,里面的数据是储存在硬盘中的,追加式的。partition的作用就是提供分布式的扩展,一个topic可以有许多partions,多个partition可以并行处理数据,所以可以处理相当量的数据。只有partition的leader才会进行读写操作,folower仅进行复制,客户端是感知不到的。
当存在多副本的情况下,会尽量把多个副本,分配到不同的broker上。kafka会为partition选出一个leader,之后所有该partition的请求,实际操作的都是leader,然后再同步到其他的follower。当一个broker歇菜后,所有leader在该broker上的partition都会重新选举,选出一个leader。
partition的分配:
a. 将所有Broker(假设共n个Broker)和待分配的Partition排序
b. 将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)
c. 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
3.2 kafka分区与消费者的关系
生产者发送消息到主题,消费者订阅主题(以消费者组的名义订阅),而主题下是分区,消息是存储在分区中的,所以事实上生产者发送消息到分区,消费者则从分区读取消息。
在server.properties配置文件中可以指定一个全局的分区数设置,这是对每个主题下的分区数的默认设置,默认是1。每个主题也可以设置自己的分区数量。
生产者将消息投递到分区的规律,默认的分区策略:
- 如果在发消息的时候指定了分区,则消息投递到指定的分区
- 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
- 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区
3.3 kafka工作方式
Consumer Group:这是kafka用来实现一个topic消息的广播和单播的手段,一个topic可以有多个group,topic的消息会发送给所有的group,但每个group只会把消息发给该group中的一个consumer。
- 如果要实现消息广播,每个consumer需要有一个独立的group;
- 如果要实现单播只要所有的consumer在同一个group;
3.4 Kafka的零拷贝技术 原文
如果有10个消费者,传统方式下,数据复制次数为4*10=40次,而使用“零拷贝技术”只需要1+10=11次,一次为从磁盘复制到页面缓存,10次表示10个消费者各自读取一次页面缓存。
3.5 kafka速度为什么这么快 参考
它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。
- 写入数据:
Kafka会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度Kafka采用了两个技术, 顺序写入 和 MMFile 。
因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。
Memory Mapped Files(后面简称mmap)也被翻译成 内存映射文件 ,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。 - 读取数据
基于sendfile实现Zero Copy
批量压缩,如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
4、使用消息队列的优缺点
优点:
-
异步处理
-
应用解耦
总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。
-
流量削锋
缺点:
系统可用性降低
系统引入的外部依赖越多,越容易挂掉。系统复杂度提高
引用消息队列后需要保证消息重复消费,消息丢失,消息传递的顺序性等。一致性问题
BCD 三个系统,BD 两个系统写库成功了,结果 C 系统写库失败了,数据就不一致了。
5、Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?
6、如何保证消息队列的高可用?
6.1RabbitMQ 的高可用性
RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。
- 普通集群模式(无高可用性)
创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。 - 镜像集群模式(高可用性)
创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。坏处在于,第一,性能开销太大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重;第二,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。
6.2Kafka 的高可用性
7、如何保证消息不被重复消费(如何保证消息的幂等性)?
幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
解决办法:参考
Rabbitmq:
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。
kafka:
每个消息写入partition后会有一个offest,消费者消费数据以后每隔一段时间会把自己消费的offest提交,但如果没提交时消费者宕机,重启后会发生重复消费就无法保证幂等性。要保证消息的幂等性需结合具体的业务,比如插数据库先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
8、如何保证消息不丢失
Rabbit消息丢失问题
-
生产者丢失数据
开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。
开启 confirm 模式,每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个** nack 接口**,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息。 -
RabbitMQ丢失数据
开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失。
设置持久化有两个步骤:
创建 queue 的时候将其设置为持久化,这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
发送消息的时候将消息的 deliveryMode 设置为 2
持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。 -
消费端弄丢了数据
关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把
kafka消息丢失问题
-
生产者丢失数据
kafka的消息发送机制分为同步和异步机制。可以通过producer.type属性进行配置。
同步模式的时候,有三种状态来保证消息的安全生产。可以通过配置request.required.acks属性。0—表示不进行消息接收是否成功的确认;1—表示当Leader接收成功时确认;-1—表示Leader和Follower都接收成功时确认;同步模式下只需要将确认机制设置为-1,让消息写入leader和所有的副本,就可以保证消息安全生产。
异步模式的时候,生产者会在本地缓冲消息,并适时批量发送。当缓冲区满了,acks=0的时候,不需要进行消息接受是否成功的确认,所以会自动清空缓冲池里的消息。则需要在配置文件中,将阻塞超时的时间设置为不限制。这样生产端会一直阻塞。可以保证数据不丢失。我们需要设置block.on.buffer.full = true。 这样producer将一直等待缓冲区直至其变为可用。缓冲区满了就阻塞。 - 保证消费者消息不丢失
kafka的consumer模式是自动提交位移的。我们只需要在代码逻辑中保证位移提交前消息被处理就行。我们可以关闭自动提交位移,设置enable.auto.commit为false。自己手动处理消息后提交位移。
9、如何保证消息的顺序性?
消息错乱的场景:
- RabbitMQ:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。
- Kafka:一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。
解决方案:
- Rabbitmq:拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
- kafka:
一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue,然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。