我所知道的kafka

cluster

kafka的cluster是基于zookeeper

publisher(发布者)

kafka publisher配置项"boostrap.servers"目的只是为了初始化连接时,从其中的某一个可用server上获取整个cluster所有成员。

如何创建topic

会在broker上随机选择一个创建topic(保证所有的topic的partition均匀的落在所有broker上),然后根据创建topic的命令中所带的partition(如果没有,则采用broker的配置默认值num.partitions)在不同的broker上创建partition,若partition多于broker,多个partition在同一个broker也是可以的。根据创建命令参数replication-factor来判断创建几个replication(未配置则采用broker的配置默认值default.replication.factor),采取增量右移的方式为每个partition创建replication。

如何向topic发消息

如果构建的topic中填入key参数,则根据key hash值hash到某个partition,若key为null,则每次发送会按照round-robin的方式将message均匀发送到每个partition上

client.id

producer和consumer端都有配置,可以用来区分消息来源

一次消息发送的完成

发送消息在producer是异步的,返回的只是一个future

public Future<RecordMetadata> send(ProducerRecord<K, V> record);

但future的结果依赖于kafka server上producer的配置request.required.acks的值,当=0时,不保证消息的到达确认,只管传送,server上并不等执行完log落地返回而是直接返回,这样如果server down了,就有可能丢失数据。当=1时,安全系数较高,当收到leader确认后,才返回,这个有一定的可靠性,但是它不等其他replia的确认结果,如果leader挂了,其他的replia还没来得及同步,也可能丢失数据。当=-1时,安全系数最高,等到leader以及所有的replia都确认了消息才返回,这样即使leader挂了,只要有一台replia可以运行,也不会丢失数据,但这种也造成了最高的延时。这是在server端的处理,但是在producer端,同步异步的处理还取决于参数producer.type,每个produer在连上server时,都会把配置读过来,当=sync时,producer.send是直接调用SyncProduer.send,等同于server端的结果有了才返回,如果=async,初始化producer时会创建一个ProducerSendThread,producer.send是把消息丢进thread的队列(队列最大长度queue.buffering.max.messages决定)里,每隔一定时间(queue.buffering.max.ms)或者队列长度超过定值(batch.num.messages)时批量发送,这样以提高吞吐量。

Q: async send的详情

A: 首先还是根据有无key来定位partition位置,每个partition有各自的queue,插入后会校验是否达到batch发送的条件,达到了,也是采用nio的机制,selector.wakeup唤醒和各个partition连接的socket执行批量发送。

server端消息的管理

消息何时被写入磁盘

kafka config中有三个配置项来定义这个策略(log.flush.interval.messages, log.flush.interval.ms, log.flush.scheduler.interval.ms),在描述这个策略前需要了解一点,linux里调用file.write并不是就直接将信息写入硬盘,根据操作系统的优化策略,会将其存放disk的buffer缓存,直到一定大小或者一定时间后或者直接fsync系统调用来批量将信息真正写入缓存。因此,前两个分别表示当多少条消息被写入buffer或者多久之后调用一次fsync,后面的scheduler表示多久检查一次,当前的消息数和间隔时间是否可以触发fsync了,于是,kafka的消息刷disk机制也就很明显了,并不是每个消息过来的时候检查是否触发fsync而是定时任务检查,否则,要是很久没消息过来,剩下的那部分消息岂不是永远在缓存里不能被触发了..但是,按照kafka文档上的建议,最好是不设置这些配置项,让操作系统自己来判断何时来执行fsync(实际上操作系统并不会执行fsync,操作系统只是有磁盘的延迟写策略,当一定时间间隔或者内存脏页即修改页超过一定数目后,就会把内存的脏页放入io写队列,而fsync则是阻塞到写完成的原语,sync则是将数据丢到io write队列即刻返回,并不等写完成,fdatasync则是保证数据写到disk上,不保证文件信息比如最新修改时间 文件大小之类的metadata的修改,<u>由于fdatasync的性能比fsync好,写日志如果采用fdatasync可以优化性能,但怎样保证文件大小不丢失呢,berkeley db采取的策略是每个log文件固定大小,每次新建后往log写入时从最后开始写,这样新建时log的大小就固定了,每个log只有一次需要同步metadata的开销</u>),这样可以达到性能最大化。而对于consumer端,没有fsync进硬盘没有写入segment文件的消息是不可见不可读的,这样同时有个问题,这是不是就表明了,kafka是有可能丢失消息的呢?当然,我们可以依靠多个replica来减少丢消息的可能,这台server物理挂了但是replica还是能存下,但是在极端情况下,理论上还是可能丢消息的。

消息在server端的保存

消息是被发送至每个partition里的,每个partition是一个文件夹,里面由多个segment文件组成。每个segment文件的保存期限由log.cleaner.enable、log.retention.check.interval.ms、log.retention.minutes和log.retention.bytes共同决定。log.cleaner.enable=true时,文件达到上限不会删除,而会生成压缩文件保存一段时间(有另一套类似的到期删除机制),=false时,达到上限就会删除,log.retention.check.interval.ms设置多久检查一次是否有需要删除的segment文件,log.retention.minutes设置的是log有效期,log.retention.bytes设置的是每个partition的容量,也就是该partition所有segment文件的最大总和。partition下每个segment达到一定大小后就会生成一个新的segment文件,这个大小由log.index.size.max.bytes来配置。partition文件夹的命名规则为topicName+partition序号(0开始),而每个segment文件其实由一个.index和.log文件构成,文件名为64位,20位数字字符offset值(这里的offset值表示在partition里第几个消息)。在.index文件里,格式为index:offset(segment里第几个消息,0开始:字节偏移量offset,0开始),这样就能够很快的索引到指定index的某个消息,比如找offset=174023的消息,先找到最接近的index,比如000000174018.index,然后找其中第四个消息即可。

消息在server端的副本的同步

request.required.acks=-1时,虽然语义上表示每次消息commit是等除leader外其他所有replia有响应时才返回,但是kafka的replia为了保证性能并不是将数据写入log中才响应leader,而是在接收到消息数据就立刻返回,因此当producer commit成功时,很可能replia中的消息还是只在缓存里,并未持久化到硬盘,发生异常后,也不能保证该消息被consumer消费,kafka觉得这在性能和数据持久化之间是一个还不错的权衡。实际上kafka中的同步是follower定时尝试从leader处批量同步(fetch)消息,follower在获取消息过程上就像一个普通的consumer一样,这样能保证kafka集群leader性能,因此producer发送消息到leader,leader commit本地log后,message并没有发ack回去,而是等待所有的ISR内的replia来尝试同步,确认同步完后,返回ack至producer,同时刷新hw.

server端partition多个replia的leader election

每个partition都维护一个ISR(in sync replia),由于kafka的消息同步是在完全同步和完全异步之间,因此follower的消息LEO(log end offset)和leader的有一定的延时性,根据配置replica.lag.max.messages、replica.lag.time.max.ms(如果一段时间内没有新message,follower会主动去leader处fetch,刷新lag time),若follower和leader之间的延时在配置的时间之内,则放在ISR之内(有被选举权),否则就被踢出ISR,之后若同步跟上了,会重新加入ISR。在leader中维护着一个HW(high watermark),标识着当前ISR中的最低延时消息offset,所有的consumer只能消费低于high watermark的message,这样保证了即使ISR里的某台或者某几个server包括leader挂了,只要ISR中还有server存在,消息就不会丢失。而同时,如果leader挂了,被选举为新leader的server将会要求其他replica从high watermark出开始截断,重新从新leader出同步消息重写log,保证所有replica的数据一致性,而当老leader恢复后,也要从HW出开始更新log,必然也会要丢掉一些老log。因此可以想象一个场景,producer发消息到leader时,leader挂了会怎么样?如果request.required.acks>=0,那么就是消息只在leader server本地,那么如果down机在完成前,会返回异常,down机在正常返回后,那么消息丢失。如果request.required.acks=-1的话,如果down机在完成前,返回异常,但也许部分follower已经同步了message了,如果被选为leader,则会导致消息重复发送。如果down机在正常返回后,是不会丢失的,因为虽然leader挂了,但是message已经到了新leader的内存里了,早晚也会落log的。

consumer(消费者)

消费者如何消费

每个consumer可以同时消费多个topic,每个consumer只属于一个指定的consumer group,每个topic的每条message只能发送给每个consumer group的某个consumer,若同一个group中对某个topic有多个consumer,那么这几个consumer将均匀分配partition,很显然,kafka的消息消费只能最多保证同一个partition的有序,不可能保证和producer完全一致的有序。consumer的消费是和replica一样,到leader server中去pull消息,保证server的性能,也保证了轻易实现partition消息获取的有序性。

Q: 消息是否保证at most once或者at least once或者exactly once?

A: 如果消息发送采用producer.type=async的话,at most once,若ack=-1,能保证at least once, exactly once需要靠幂等实现。在consumer端,若获取消息后,消费消息后commit,则有可能消费消息未来得及commit就crash,这样就会重复消费,如果配置成autocommit,则为commit后消费消息,这样有可能有消息没有消费到,所以消费消息后commit同时保证幂等可能是比较靠谱的办法。

感想

每个topic在server端存放多个partition顺序读写是kafka性能强劲的特色所在,但在一定场景下,这个特色也会成为它的性能瓶颈。如果将kafka用于log日志汇聚系统,topic不多,问题不大,若将它用于业务场景的mq,topic随着业务增长越来越多的话,则每个server上不同topic的partition也就越来越多,需要在不同的partition上写入消息,那么写入场景就越来越接近随机写,顺序写硬盘的优势就会越来越小,这也就是为什么kafka比较少作为大公司业务内部mq存在的原因吧。这种场景下,rocketmq应运而生。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容