cluster
kafka的cluster是基于zookeeper
publisher(发布者)
kafka publisher配置项"boostrap.servers"目的只是为了初始化连接时,从其中的某一个可用server上获取整个cluster所有成员。
如何创建topic
会在broker上随机选择一个创建topic(保证所有的topic的partition均匀的落在所有broker上),然后根据创建topic的命令中所带的partition(如果没有,则采用broker的配置默认值num.partitions)在不同的broker上创建partition,若partition多于broker,多个partition在同一个broker也是可以的。根据创建命令参数replication-factor来判断创建几个replication(未配置则采用broker的配置默认值default.replication.factor),采取增量右移的方式为每个partition创建replication。
如何向topic发消息
如果构建的topic中填入key参数,则根据key hash值hash到某个partition,若key为null,则每次发送会按照round-robin的方式将message均匀发送到每个partition上
client.id
producer和consumer端都有配置,可以用来区分消息来源
一次消息发送的完成
发送消息在producer是异步的,返回的只是一个future
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
但future的结果依赖于kafka server上producer的配置request.required.acks的值,当=0时,不保证消息的到达确认,只管传送,server上并不等执行完log落地返回而是直接返回,这样如果server down了,就有可能丢失数据。当=1时,安全系数较高,当收到leader确认后,才返回,这个有一定的可靠性,但是它不等其他replia的确认结果,如果leader挂了,其他的replia还没来得及同步,也可能丢失数据。当=-1时,安全系数最高,等到leader以及所有的replia都确认了消息才返回,这样即使leader挂了,只要有一台replia可以运行,也不会丢失数据,但这种也造成了最高的延时。这是在server端的处理,但是在producer端,同步异步的处理还取决于参数producer.type,每个produer在连上server时,都会把配置读过来,当=sync时,producer.send是直接调用SyncProduer.send,等同于server端的结果有了才返回,如果=async,初始化producer时会创建一个ProducerSendThread,producer.send是把消息丢进thread的队列(队列最大长度queue.buffering.max.messages决定)里,每隔一定时间(queue.buffering.max.ms)或者队列长度超过定值(batch.num.messages)时批量发送,这样以提高吞吐量。
Q: async send的详情
A: 首先还是根据有无key来定位partition位置,每个partition有各自的queue,插入后会校验是否达到batch发送的条件,达到了,也是采用nio的机制,selector.wakeup唤醒和各个partition连接的socket执行批量发送。
server端消息的管理
消息何时被写入磁盘
kafka config中有三个配置项来定义这个策略(log.flush.interval.messages, log.flush.interval.ms, log.flush.scheduler.interval.ms),在描述这个策略前需要了解一点,linux里调用file.write并不是就直接将信息写入硬盘,根据操作系统的优化策略,会将其存放disk的buffer缓存,直到一定大小或者一定时间后或者直接fsync系统调用来批量将信息真正写入缓存。因此,前两个分别表示当多少条消息被写入buffer或者多久之后调用一次fsync,后面的scheduler表示多久检查一次,当前的消息数和间隔时间是否可以触发fsync了,于是,kafka的消息刷disk机制也就很明显了,并不是每个消息过来的时候检查是否触发fsync而是定时任务检查,否则,要是很久没消息过来,剩下的那部分消息岂不是永远在缓存里不能被触发了..但是,按照kafka文档上的建议,最好是不设置这些配置项,让操作系统自己来判断何时来执行fsync(实际上操作系统并不会执行fsync,操作系统只是有磁盘的延迟写策略,当一定时间间隔或者内存脏页即修改页超过一定数目后,就会把内存的脏页放入io写队列,而fsync则是阻塞到写完成的原语,sync则是将数据丢到io write队列即刻返回,并不等写完成,fdatasync则是保证数据写到disk上,不保证文件信息比如最新修改时间 文件大小之类的metadata的修改,<u>由于fdatasync的性能比fsync好,写日志如果采用fdatasync可以优化性能,但怎样保证文件大小不丢失呢,berkeley db采取的策略是每个log文件固定大小,每次新建后往log写入时从最后开始写,这样新建时log的大小就固定了,每个log只有一次需要同步metadata的开销</u>),这样可以达到性能最大化。而对于consumer端,没有fsync进硬盘没有写入segment文件的消息是不可见不可读的,这样同时有个问题,这是不是就表明了,kafka是有可能丢失消息的呢?当然,我们可以依靠多个replica来减少丢消息的可能,这台server物理挂了但是replica还是能存下,但是在极端情况下,理论上还是可能丢消息的。
消息在server端的保存
消息是被发送至每个partition里的,每个partition是一个文件夹,里面由多个segment文件组成。每个segment文件的保存期限由log.cleaner.enable、log.retention.check.interval.ms、log.retention.minutes和log.retention.bytes共同决定。log.cleaner.enable=true时,文件达到上限不会删除,而会生成压缩文件保存一段时间(有另一套类似的到期删除机制),=false时,达到上限就会删除,log.retention.check.interval.ms设置多久检查一次是否有需要删除的segment文件,log.retention.minutes设置的是log有效期,log.retention.bytes设置的是每个partition的容量,也就是该partition所有segment文件的最大总和。partition下每个segment达到一定大小后就会生成一个新的segment文件,这个大小由log.index.size.max.bytes来配置。partition文件夹的命名规则为topicName+partition序号(0开始),而每个segment文件其实由一个.index和.log文件构成,文件名为64位,20位数字字符offset值(这里的offset值表示在partition里第几个消息)。在.index文件里,格式为index:offset(segment里第几个消息,0开始:字节偏移量offset,0开始),这样就能够很快的索引到指定index的某个消息,比如找offset=174023的消息,先找到最接近的index,比如000000174018.index,然后找其中第四个消息即可。
消息在server端的副本的同步
request.required.acks=-1时,虽然语义上表示每次消息commit是等除leader外其他所有replia有响应时才返回,但是kafka的replia为了保证性能并不是将数据写入log中才响应leader,而是在接收到消息数据就立刻返回,因此当producer commit成功时,很可能replia中的消息还是只在缓存里,并未持久化到硬盘,发生异常后,也不能保证该消息被consumer消费,kafka觉得这在性能和数据持久化之间是一个还不错的权衡。实际上kafka中的同步是follower定时尝试从leader处批量同步(fetch)消息,follower在获取消息过程上就像一个普通的consumer一样,这样能保证kafka集群leader性能,因此producer发送消息到leader,leader commit本地log后,message并没有发ack回去,而是等待所有的ISR内的replia来尝试同步,确认同步完后,返回ack至producer,同时刷新hw.
server端partition多个replia的leader election
每个partition都维护一个ISR(in sync replia),由于kafka的消息同步是在完全同步和完全异步之间,因此follower的消息LEO(log end offset)和leader的有一定的延时性,根据配置replica.lag.max.messages、replica.lag.time.max.ms(如果一段时间内没有新message,follower会主动去leader处fetch,刷新lag time),若follower和leader之间的延时在配置的时间之内,则放在ISR之内(有被选举权),否则就被踢出ISR,之后若同步跟上了,会重新加入ISR。在leader中维护着一个HW(high watermark),标识着当前ISR中的最低延时消息offset,所有的consumer只能消费低于high watermark的message,这样保证了即使ISR里的某台或者某几个server包括leader挂了,只要ISR中还有server存在,消息就不会丢失。而同时,如果leader挂了,被选举为新leader的server将会要求其他replica从high watermark出开始截断,重新从新leader出同步消息重写log,保证所有replica的数据一致性,而当老leader恢复后,也要从HW出开始更新log,必然也会要丢掉一些老log。因此可以想象一个场景,producer发消息到leader时,leader挂了会怎么样?如果request.required.acks>=0,那么就是消息只在leader server本地,那么如果down机在完成前,会返回异常,down机在正常返回后,那么消息丢失。如果request.required.acks=-1的话,如果down机在完成前,返回异常,但也许部分follower已经同步了message了,如果被选为leader,则会导致消息重复发送。如果down机在正常返回后,是不会丢失的,因为虽然leader挂了,但是message已经到了新leader的内存里了,早晚也会落log的。
consumer(消费者)
消费者如何消费
每个consumer可以同时消费多个topic,每个consumer只属于一个指定的consumer group,每个topic的每条message只能发送给每个consumer group的某个consumer,若同一个group中对某个topic有多个consumer,那么这几个consumer将均匀分配partition,很显然,kafka的消息消费只能最多保证同一个partition的有序,不可能保证和producer完全一致的有序。consumer的消费是和replica一样,到leader server中去pull消息,保证server的性能,也保证了轻易实现partition消息获取的有序性。
Q: 消息是否保证at most once或者at least once或者exactly once?
A: 如果消息发送采用producer.type=async的话,at most once,若ack=-1,能保证at least once, exactly once需要靠幂等实现。在consumer端,若获取消息后,消费消息后commit,则有可能消费消息未来得及commit就crash,这样就会重复消费,如果配置成autocommit,则为commit后消费消息,这样有可能有消息没有消费到,所以消费消息后commit同时保证幂等可能是比较靠谱的办法。
感想
每个topic在server端存放多个partition顺序读写是kafka性能强劲的特色所在,但在一定场景下,这个特色也会成为它的性能瓶颈。如果将kafka用于log日志汇聚系统,topic不多,问题不大,若将它用于业务场景的mq,topic随着业务增长越来越多的话,则每个server上不同topic的partition也就越来越多,需要在不同的partition上写入消息,那么写入场景就越来越接近随机写,顺序写硬盘的优势就会越来越小,这也就是为什么kafka比较少作为大公司业务内部mq存在的原因吧。这种场景下,rocketmq应运而生。