一、啥时候用?用了会有啥提升,有啥问题
日志的采集
通过kafka+elk;
消息系统
协调系统间业务、生产者和消费者的解耦、缓存消息
消息广播,发布 / 订阅系统实现一个微服务级系统间的观察者模式
用户活动的追踪
用户在web或者App的活动行为,通过消费kafka的topic做分析或者存储;
限流、削峰填谷
对激增场景限流;电商商城、订单、支付;
流式处理
连接计算任务和数据、流式计算框架spark streaming 和 storm等。
总结需求:
1.需要异步处理,解耦
2.流量控制
3.服务通信
4.日志收集
引入存在的缺点:
1.引入消息队列带来的延迟问题;
2.增加了系统的复杂度;
3.可能产生数据不一致的问题,如要保持强一致性,需要高代价的补偿(如分布式事务、对账);
4.有数据丢失风险,如宕机重启,如要保证队列数据可用,需要额外机制保证(如双活容灾);
意识到消费队列的优劣,取舍,具体问题具体分析。
二、怎么选择消息队列?
1.必须是开源的产品,有bug自己上,处理或者规避一下bug
资料多比较活跃的产品,不要冷门,集成比较OK的。
比如:kafka和flink ,flink内置kafka 的Data Source,flink-connector-kafka
2.消息传输可靠,尽量保证不丢消息
3.支持集群的,不会一个节点挂,就都玩完的
4.具有足够好的性能,满足大多数场景的性能要求
以下为主流消息队列产品:
1.RabbitMQ
Messaging that just works,“开箱即用的消息队列”,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块。
可以理解为交换机,根据配置的路由规则,将生产者发出的消息分发到不同的队列中。路由的规则也非常灵活,甚至你可以自己来实现路由规则,多语言的客户端,提供消息跟踪机制和插件扩展,对数据持久化和负载均衡有较好的支持,适合于重量级的企业业务应用。
缺点:消息堆积处理的不好,设计理念,队列是一个管道,大量堆积是一种不正常的情况,需要规避处理。一堆积就废废,性能大幅度下降
性能:每秒处理 1W~10W+ 消息
2.RocketMQ
阿里巴巴在 2012 年开源的消息队列产品,后来赠予apache,2017年正式毕业。
有活跃的中文社区基本有问题都能搜到,java开发且性能很好,大多数情况情况毫秒级响应。在意延迟的可以优先考虑。
支持push和pull的消费模式,单一队列支持百万级的处理消息堆积能力,支持JMS、MQTT等的消息协议,至少一次的消息传递语义,同时支持云集群的部署
缺点:国外不大流行,生态集成比较少。
性能:每秒几十万
3.Kafka
由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica)、基于zookeeper协调的分布式流平台,Apache 的顶级项目。Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域,几乎所有的相关开源软件系统都会优先支持 Kafka。
设计上大量使用了批量和异步的思想,这种设计使得 Kafka 能做到超高的性能。
缺点:它的同步收发消息的响应时延比较高,比较多的地方都会使用攒一波再一起处理。不太适合在线
性能:每秒几十万,和RocketMQ差不多
以下为二梯队消息队列产品:
1.ActiveMQ
十年前唯一开源的老牌消息队列,步入老年期,社区不活跃。性能,功能落后,存在意义可能只是兼容爷爷辈的系统。
2.ZeroMQ
严格来说不是个消息队列,基于消息队列的多线程网络库,可将消息队列功能集成到系统进程中。
支持Java、C++等30多种开发语言,非持久性,宕机就GG
3.Redis
不仅可以作为缓存的处理器,还可以用来做消息队列,它的列表类型适合轻量级的消息队列使用。当入队数据量比较小的时候,Redis的性能高于rabbitMQ,但是数据大于10K之后就变得很慢;在出队的时候无论数据多少,都有很好的性能。
但是,不太行,如果作为消息队列,消息一发布,就把消息都推送给订阅者了,然后进行删除操作,数据在内存保存的时间很短。
list做队列,rdb或者aof会进行持久化,但是同样也会丢数据,GG
4.Pulsar
一个新兴的开源消息队列产品,最早是由 Yahoo 开发,目前处于成长期,流行度和成熟度相对没有那么高
总结:
对功能性能没有高要求的,rabbitmq就可以,开箱即用易于维护。
处理在线功能业务,交易系统,商城订单的用rocketmq,低延迟,高稳定性。
海量信息处理,日志收集,监控,处理埋点的场景,以及大数据、流计算相关产品用kafka
如果以上场景都不符合,挑个自己最熟悉的用吧,快速上手,稳的一批。
三、kafka的主要成分
producer 消息生产者,向broker发消息的客户端;
consumer 消息的消费者,从broker获取消息并消费;
broker 存储消息的中间节点,一台服务器就是broker,一个broker包含多个topic。多个broker可以组成集群。
topic 主题,一类消息,可以是日志等通过topic来存储;
partition 分区, topic上的一个物理分组,一个topic可以有多个,每个partition都是有顺序的队列。提高并行处理能力;
segment 段, partition上的物理存储由多个大小相等的segment(段)文件构成,但是段上的消息数量不一定相等,方便无用文件的快速删除,提升磁盘空间的利用率。segment的文件生命周期由服务端的配置参数决定。
message 消息,segment由很多message组成;
offset 偏移,partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息message都有一个连续的序列号ID叫做offset,用于partition唯一标识一条消息。通过segment和offset可以查询到对应的消息。
四、kafka的消息发送过程
首先获取topic的所有的partition,如果客户端不指定partition,同时也没有指定的key,使用自增的数字取余数的方法获取指定的partition。这样就是平均的向partition生产数据。
如果需要控制发送的partition,可以通过两种方式,一种是指定的partition,另一种是根据自己设置key的算法。继承partition的接口实现其方法。
五、consumer的消费机制
kafka通过broker持久化数据,不需要进行缓存处理,消费者通过pull的机制,定期从服务器拉取数据。不同的consumer group中的消费者可以消费partition相同的消息,相同的group下的消费者只能消费partition中不同的数据信息。通过记录每个消费者在各个topic下的partition的消费offset,每次pull数据的时候都是从上次记录的位置开始拉取数据。
六、kafka的高性能IO实现
1.使用批量消息处理提升服务端处理能力
Kafka客户端SDK中,Kafka 的 Producer 只提供了单条发送的 send() 方法,并没有提供任何批量发送的接口。原因是,Kafka 根本就没有提供单条发送的功能,虽然它提供的 API 每次只能发送一条消息,但实际上,Kafka 的客户端 SDK 在实现消息发送逻辑的时候,采用了异步批量发送的机制。
当你调用 send() 方法发送一条消息之后,无论你是同步发送还是异步发送,Kafka 都不会立即就把这条消息发送出去。它会先把这条消息,存放在内存中缓存起来,然后选择合适的时机把缓存中的所有消息组成一批,一次性发给 Broker。简单地说,就是攒一波一起发。
Broker这一端咋处理呢?拆了一条条处理太慢,那就一批当一条,无论是读写磁盘,或者复制到其他副本,批消息都不会解开,一条“批消息”进行处理的。
2.使用顺序读写提升磁盘IO性能
对于磁盘来说,它有一个特性,就是顺序读写的性能要远远好于随机读写。在 SSD(固态硬盘)上,顺序读写的性能要比随机读写快几倍,如果是机械硬盘,这个差距会达到几十倍。
操作系统每次从磁盘读写数据的时候,需要先寻址,也就是先要找到数据在磁盘上的物理位置,然后再进行数据读写。顺序读写相比随机读写省去了大部分的寻址时间,它只要寻址一次,就可以连续地读写下去,所以说,性能要比随机读写要好很多。
kafka对于每个分区,把从 Producer 收到的消息,顺序地写入对应的 log 文件中,一个文件写满了,就开启一个新的文件这样顺序写下去。消费的时候,也是从某个全局的位置开始,也就是某一个 log 文件中的某个位置开始,顺序地把消息读出来。
3.利用 PageCache 加速消息读写
PageCache 就是操作系统在内存中给磁盘上的文件建立的缓存,无论我们使用什么语言编写的程序,在调用系统的 API 读写文件的时候,并不会直接去读写磁盘上的文件,应用程序实际操作的都是 PageCache,也就是文件在内存中缓存的副本。清理的策略一般是 LRU 或它的变种算法
Kafka 在读写消息文件的时候,充分利用了 PageCache 的特性。
一般来说,消息刚刚写入到服务端就会被消费,按照 LRU 的“优先清除最近最少使用的页”这种策略,读取的时候,对于这种刚刚写入的 PageCache,命中的几率会非常高。也就是说,大部分情况下,消费读消息都会命中 PageCache。
带来的好处有两个:一个是读取的速度会非常快,另外一个是,给写入消息让出磁盘的 IO 资源,间接也提升了写入的性能
4.ZeroCopy:零拷贝技术
处理消费逻辑:
先从文件读数据到内存,然后把消息通过网络传输给客户端,起码2次~3次复制
1.文件到pagecache,如果命中不用复制
2.从pagecache 到应用程序内存中,可操作对象的内存。
3.从应用程序内存到Socket缓冲区,网络传输
零拷贝可以把2.3步骤合并一次,直接pageCache 到Socket缓冲区中,减少一次复制,由于不用把数据复制到用户内存空间,
DMA 控制器(CPU和DMA控制接口逻辑芯片共同组成,嵌入式系统的DMA控制器内建在处理器芯片内部)可以直接完成数据复制,不需要 CPU 参与,速度更快。
零拷贝对应的系统调用:ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
前两个参数分别是目的端和源端的文件描述符,后面两个参数是源端的偏移量和复制数据的长度,返回值是实际复制数据的长度
七、消息可靠性
消息传递有这几种情况:
At least once(默认)
至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
At most once
至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息
一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失
Exactly once
恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。
丢消息:
检查是否丢消息,可以在producter端给每个消息附加一个连续递增的序号,在消费端验证连续性。
验证多实例,集群的话,在producer分别生产序号,每个producer加上标示 。在消费端按照表示查看连续性。
防止丢消息,这个需要producer 和consumer 两端保证的。
kafka 发消息,一旦这条消息被commit,因为replication拷贝副本的存在就不会丢失,每个分区数据提供多个副本